当前位置: 首页 > news >正文

RabbitMQ实战学习

RabbitMQ实战学习

文章目录

  • RabbitMQ实战学习
    • RabbitMQ常用资料
      • 1、安装教程
      • 2、使用安装包
      • 3、常用命令
      • 4、验证访问
      • 5、代码示例
    • 一、RabbitMQ基本概念
        • 1.1. MQ概述
        • 1.2 MQ 的优势和劣势
        • 1.3 MQ 的优势
          • 1. 应用解耦
          • 2. 异步提速
          • 3. 削峰填谷
        • 1.4 MQ 的劣势
        • 1.5 RabbitMQ 基础架构
        • 1.6 JMS
    • 二、RabbitMQ学习
      • 1、五种工作模式
        • 1.1 简单模式
        • 1.2 work queues模式
        • 1.3 Pub/Sub 订阅模式
        • 1.4 Routing 路由模式
        • 1.5 Topics 通配符模式
      • 2、工作模式总结
      • 3、SpringBoot 整合RabbitMQ
        • 3.1 生产者
        • 3.2 消费者
        • 3.3 小结
    • 三、常见问题
      • 1、Erlang和RabbitMQ版本不匹配
      • 2、找不到ERLANG_HOME
      • 3、RabbitMQ和Erlang的版本对应关系

RabbitMQ常用资料

1、安装教程

https://blog.csdn.net/l1090739767/article/details/116302051

2、使用安装包

rabbitmq-server

https://repo.huaweicloud.com/rabbitmq-server/v3.12.1/

erlang

https://erlang.org/download/

3、常用命令

rabbitmq-service.bat start # 启动服务
service rabbitmq-server start 
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务

如果环境变量不生效,可以进行强制设置

#设置ERLANG_HOME的路径地址
set ERLANG_HOME=E:\app\MQ\Erlang OTP

4、验证访问

http://localhost:15672/

5、代码示例

码云仓库

https://gitee.com/zhang_w_b/rabbit-mq

(1)RabbitMQ五种工作模式

rabbitmq-producer

rabbitmq-consumer

(2)SpringBoot 整合RabbitMQ

rabbitmq-producer

rabbitmq-consumer

一、RabbitMQ基本概念

1.1. MQ概述
  • MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信

在这里插入图片描述

小结

  1. MQ,消息队列,存储消息的中间件
  2. 分布式系统通信两种方式:直接远程调用借助第三方 完成间接通信
  3. 发送方称为生产者,接收方称为消费者
1.2 MQ 的优势和劣势

优势:

  • 应用解耦
  • 异步提速
  • 削峰填古

劣势:

  • 系统可用性降低
  • 系统复杂性提高
  • 一致性问题
1.3 MQ 的优势
1. 应用解耦

没有使用MQ:
在这里插入图片描述- 系统的耦合性越高容错性就越低可维护性就越低

使用MQ:
在这里插入图片描述

  • 使用 MQ 使得应用间解耦提升容错性和可维护性
2. 异步提速

没有使用MQ:
在这里插入图片描述

  • 一个下单操作耗时:20 + 300 + 300 + 300 = 920ms
  • 用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!

使用MQ:
在这里插入图片描述

  • 用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。
  • 提升用户体验和系统吞吐量(单位时间内处理请求的数目)。
3. 削峰填谷

没有使用MQ:
在这里插入图片描述使用MQ:

在这里插入图片描述在这里插入图片描述

  • 使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ
    ,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做填谷
  • 使用MQ后,可以提高系统稳定性。

小结:

  • 应用解耦:提高系统容错性和可维护性
  • 异步提速:提升用户体验和系统吞吐量
  • 削峰填谷:提高系统稳定性
1.4 MQ 的劣势

在这里插入图片描述

  • 系统可用性降低

系统引入的外部依赖越多系统稳定性越差一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?

  • 系统复杂度提高

MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?

  • 一致性问题

A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

小结

既然 MQ 有优势也有劣势,那么使用 MQ 需要满足什么条件呢?

  • 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
  • 容许短暂的不一致性
  • 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。
1.5 RabbitMQ 基础架构

在这里插入图片描述

RabbitMQ 中的相关概念:

  • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时可以划分出多个vhost每个用户在自己的 vhost创建 exchange/queue
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接
  • Channel如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method包含了channel id 帮助客户端和message broker 识别 channel,所以 channel之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchange:message 到达 broker 的第一站**,根据分发规则,匹配查询表中的 routing key,分发消息到queue中去。**常用的类型有:direct (point-to-point), topic (publish-subscribe) and
    fanout (multicast)
  • Queue消息最终被送到这里等待 consumer 取走
  • Bindingexchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding
    信息被保存到 exchange 中的查询表中,用于 message 的分发依据

RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。

1.6 JMS
  • JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件的API
  • JMS 是 JavaEE 规范中的一种,类比JDBC
  • 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,但是开源社区有

小结

  • RabbitMQ 是基于 AMQP 协议使用 Erlang 语言开发的一款消息队列产品
  • RabbitMQ提供了6种工作模式,我们学习5种。这是今天的重点。
  • AMQP 是协议,类比HTTP。
  • JMS 是 API 规范接口,类比 JDBC。

二、RabbitMQ学习

1、五种工作模式

简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式

1.1 简单模式

上述的入门案例中其实使用的是如下的简单模式:

在这里插入图片描述

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接收者,会一直等待消息到来
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

代码示例:

依赖

    <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency>

生产者:

package com.zwb.producer;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer_HelloWorld {public static void main(String[] args) throws IOException, TimeoutException {//1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2、设置参数//factory.setHost("192.168.1.100");   //默认localhostfactory.setPort(5672);     //默认5672factory.setVirtualHost("/itcast"); //设置RabbitMQ服务器中的虚拟机,类似mysql数据库中的某一个库factory.setUsername("heima");factory.setPassword("heima");//3、创建连接ConnectionConnection connection = factory.newConnection();//4、创建ChannelChannel channel = connection.createChannel();//5.创建队列Queue 暂时不用交换机/*String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)1.queue 队列名称2.durable 是否持久化,当mq重启之后,还在3.exclusive:*是否独占:只能有一个消费者监听队列*当connection关闭时,是否删除队列4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉5.arguments: 参数*///如果没有一个名字为hello_world的队列,则会创建队列,如果有则不会创建channel.queueDeclare("hello_world",true,false,false,null);//6.发送消息/*String exchange, String routingKey, boolean mandatory, byte[] body参数:1. exchange:交换机名称,简单模式下交换机会使用默认的 ""2. routingKey: 路由名称3. props: 配置信息4. body: 发送消息数据*/String body="hello rabbitmq----测试内容222";channel.basicPublish("","hello_world",null,body.getBytes());//7、释放资源/*channel.close();connection.close();*/}
}

消费者:

package com.zwb.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_HelloWorld {public static void main(String[] args) throws IOException, TimeoutException {//1、创建工厂ConnectionFactory factory = new ConnectionFactory();//2、设置参数//factory.setHost("192.168.1.100"); //默认localhostfactory.setPort(5672);//默认5672factory.setVirtualHost("/itcast");// 默认/factory.setUsername("heima");// heimafactory.setPassword("heima");// heima//3、创建连接ConnectionConnection connection = factory.newConnection();//4、创建ChannelChannel channel = connection.createChannel();//5、创建队列Queue 暂时不用交换机/*String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)1.queue 队列名称2.durable 是否持久化,当mq重启之后,还在3.exclusive:*是否独占:只能有一个消费者监听队列*当connection关闭时,是否删除队列4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉5.arguments: 参数*///如果没有一个名字为hello_world的队列,则会创建队列,如果有则不会创建channel.queueDeclare("hello_world",true,false,false,null);//6、接收消息/*basicConsume(String queue, boolean autoAck, Consumer callback)参数:1. queue:队列名称2. autoAck: 是否自动确认3. callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel) {/*回调方法,当收到消息后,会自动执行该方法1.consumerTag:标识2.envelope:获取一些信息。交换机,路由key...3.properties: 配置信息4.body: 数据*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);System.out.println("body:"+new String(body));//byte数组转string字符串}};channel.basicConsume("hello_world",true,consumer);//关闭资源?不要,因为消费者要监听}
}
1.2 work queues模式

(1)模式说明

在这里插入图片描述

  • Work Queues:与入门程序的简单模式相比,多了一个或一些消费端多个消费端共同消费同一个队列中的消息。(队列存在竞争关系,可以理解为抢红包
  • 应用场景:对于任务过重任务较多情况使用工作队列可以提高任务处理的速度。

(2)代码编写

Work Queues 与入门程序的简单模式的代码几乎是一样的。可以完全复制,并多复制一个消费者进行多个消费者同时对消费消息的测试。

Producer_WorkQueues

package com.itheima.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;//简单模式
//发送消息
public class Producer_WorkQueues {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.设置参数factory.setHost("192.168.101.22");//ip地址 默认localhostfactory.setPort(5672);//端口 默认值5672factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机factory.setUsername("heima");//用户名 默认guestfactory.setPassword("heima");//密码 默认guest//3.创建连接 connectionConnection connection = factory.newConnection();//4.创建channelChannel channel = connection.createChannel();//5.创建队列Queue 暂时不用交换机/*String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)1.queue 队列名称2.durable 是否持久化,当mq重启之后,还在3.exclusive:*是否独占:只能有一个消费者监听队列*当connection关闭时,是否删除队列4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉5.arguments: 参数*///如果没有一个名字为hello_world的队列,则会创建队列,如果有则不会创建channel.queueDeclare("work_queues",true,false,false,null);//6.发送消息/*String exchange, String routingKey, boolean mandatory, byte[] body参数:1. exchange:交换机名称,简单模式下交换机会使用默认的 ""2. routingKey: 路由名称3. props: 配置信息4. body: 发送消息数据*/for (int i = 1; i <=10; i++) {String body=i+"hello rabbitmq----";channel.basicPublish("","work_queues",null,body.getBytes());}//7.释放资源/*channel.close();connection.close();*/}
}

Consumer_WorkQueues1

package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_WorkQueues1 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.设置参数factory.setHost("192.168.101.22");//ip地址 默认localhostfactory.setPort(5672);//端口 默认值5672factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机factory.setUsername("heima");//用户名 默认guestfactory.setPassword("heima");//密码 默认guest//3.创建连接 connectionConnection connection = factory.newConnection();//4.创建channelChannel channel = connection.createChannel();//5.创建队列Queue 暂时不用交换机/*String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)1.queue 队列名称2.durable 是否持久化,当mq重启之后,还在3.exclusive:*是否独占:只能有一个消费者监听队列*当connection关闭时,是否删除队列4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉5.arguments: 参数*///如果没有一个名字为hello_world的队列,则会创建队列,如果有则不会创建channel.queueDeclare("work_queues",true,false,false,null);//6.接收消息/*basicConsume(String queue, boolean autoAck, Consumer callback)参数:1. queue:队列名称2. autoAck: 是否自动确认3. callback: 回调对象*/Consumer consumer=new DefaultConsumer(channel){/*回调方法,当收到消息后,会自动执行该方法1.consumerTag:标识2.envelope:获取一些信息。交换机,路由key...3.properties: 配置信息4.body: 数据*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/* System.out.println("consumerTag:"+consumerTag);System.out.println("exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);*/System.out.println("body:"+new String(body));//byte数组转string字符串}};channel.basicConsume("work_queues",true,consumer);//关闭资源?不要,因为消费者要监听}
}

Consumer_WorkQueues2

package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_WorkQueues2 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.设置参数factory.setHost("192.168.101.22");//ip地址 默认localhostfactory.setPort(5672);//端口 默认值5672factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机factory.setUsername("heima");//用户名 默认guestfactory.setPassword("heima");//密码 默认guest//3.创建连接 connectionConnection connection = factory.newConnection();//4.创建channelChannel channel = connection.createChannel();//5.创建队列Queue 暂时不用交换机/*String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)1.queue 队列名称2.durable 是否持久化,当mq重启之后,还在3.exclusive:*是否独占:只能有一个消费者监听队列*当connection关闭时,是否删除队列4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉5.arguments: 参数*///如果没有一个名字为hello_world的队列,则会创建队列,如果有则不会创建channel.queueDeclare("work_queues",true,false,false,null);//6.接收消息/*basicConsume(String queue, boolean autoAck, Consumer callback)参数:1. queue:队列名称2. autoAck: 是否自动确认3. callback: 回调对象*/Consumer consumer=new DefaultConsumer(channel){/*回调方法,当收到消息后,会自动执行该方法1.consumerTag:标识2.envelope:获取一些信息。交换机,路由key...3.properties: 配置信息4.body: 数据*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/* System.out.println("consumerTag:"+consumerTag);System.out.println("exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);*/System.out.println("body:"+new String(body));//byte数组转string字符串}};channel.basicConsume("work_queues",true,consumer);//关闭资源?不要,因为消费者要监听}
}

(3)测试

注意:先运行消费者1和消费者2,再运行生产者

结果如下:消费者1消费消息序号 1,3,5,7,9
消费者2消费消息序号 2,4,6,8,10

在这里插入图片描述

在这里插入图片描述

(4) 小结

  1. 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
  2. Work Queues 对于任务过重任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可
1.3 Pub/Sub 订阅模式

(1)模式说明

在这里插入图片描述

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

  • C:消费者,消息的接收者,会一直等待消息到来

  • Queue:消息队列,接收消息、缓存消息

  • Exchange:交换机(X)。一方面,

    接收生产者发送的消息

    。另一方面,

    知道如何处理消息

    ,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

    1. Fanout:广播,将消息交给所有绑定到交换机的队列
    2. Direct:定向,把消息交给符合指定routing key 的队列
    3. Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

(2)代码编写

生产者Producer_PubSub

package com.itheima.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;//发布订阅模式
//发送消息
public class Producer_PubSub {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.设置参数factory.setHost("192.168.101.22");//ip地址 默认localhostfactory.setPort(5672);//端口 默认值5672factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机factory.setUsername("heima");//用户名 默认guestfactory.setPassword("heima");//密码 默认guest//3.创建连接 connectionConnection connection = factory.newConnection();//4.创建channelChannel channel = connection.createChannel();//5.创建交换机/*exchangeDeclare(String exchange,  交换机名称BuiltinExchangeType type,  交换机类型 枚举类型DIRECT("direct"), :定向FANOUT("fanout"), :扇形(广播),发送消息到每一个与之绑定的队列TOPIC("topic"),:通配符方式HEADERS("headers");参数匹配boolean durable, 是否持久化boolean autoDelete, 是否自动删除boolean internal, 内部使用一般falseMap<String, Object> arguments) 参数列表*/String exchangeName="test_fanout";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);//6.创建两个队列String queue1Name="test_fanout_queue1";String queue2Name="test_fanout_queue2";/*String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)1.queue 队列名称2.durable 是否持久化,当mq重启之后,还在3.exclusive:*是否独占:只能有一个消费者监听队列*当connection关闭时,是否删除队列4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉5.arguments: 参数*/channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);//7,绑定队列和交换机/*queueBind(String queue, String exchange, String routingKey)参数:1.queue: 队列名称2.exchange:交换机3.routingKey:路由键:绑定规则如果交换机的类型为fanout,routingKey设置为""*/channel.queueBind(queue1Name,exchangeName,"");channel.queueBind(queue2Name,exchangeName,"");//8.发送消息String body="日志信息:张三调用了findAll方法...日志级别:info";channel.basicPublish(exchangeName,"",null,body.getBytes());//9.释放资源channel.close();connection.close();}
}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576

Consumer_PubSub1

package com.itheima.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_PubSub1 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.设置参数factory.setHost("192.168.101.22");//ip地址 默认localhostfactory.setPort(5672);//端口 默认值5672factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机factory.setUsername("heima");//用户名 默认guestfactory.setPassword("heima");//密码 默认guest//3.创建连接 connectionConnection connection = factory.newConnection();//4.创建channelChannel channel = connection.createChannel();//5.队列已经声明一次了,这里不用再声明队列String queue1Name="test_fanout_queue1";String queue2Name="test_fanout_queue2";//6.接收消息/*basicConsume(String queue, boolean autoAck, Consumer callback)参数:1. queue:队列名称2. autoAck: 是否自动确认3. callback: 回调对象*/Consumer consumer=new DefaultConsumer(channel){/*回调方法,当收到消息后,会自动执行该方法1.consumerTag:标识2.envelope:获取一些信息。交换机,路由key...3.properties: 配置信息4.body: 数据*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/* System.out.println("consumerTag:"+consumerTag);System.out.println("exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);*/System.out.println("body:"+new String(body));//byte数组转string字符串System.out.println("将日志信息打印到控制台");}};channel.basicConsume(queue1Name,true,consumer);//关闭资源?不要,因为消费者要监听}
}

Consumer_PubSub2

package com.itheima.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_PubSub2 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.设置参数factory.setHost("192.168.101.22");//ip地址 默认localhostfactory.setPort(5672);//端口 默认值5672factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机factory.setUsername("heima");//用户名 默认guestfactory.setPassword("heima");//密码 默认guest//3.创建连接 connectionConnection connection = factory.newConnection();//4.创建channelChannel channel = connection.createChannel();//5.队列已经声明一次了,这里不用再声明队列String queue1Name="test_fanout_queue1";String queue2Name="test_fanout_queue2";//6.接收消息/*basicConsume(String queue, boolean autoAck, Consumer callback)参数:1. queue:队列名称2. autoAck: 是否自动确认3. callback: 回调对象*/Consumer consumer=new DefaultConsumer(channel){/*回调方法,当收到消息后,会自动执行该方法1.consumerTag:标识2.envelope:获取一些信息。交换机,路由key...3.properties: 配置信息4.body: 数据*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/* System.out.println("consumerTag:"+consumerTag);System.out.println("exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);*/System.out.println("body:"+new String(body));//byte数组转string字符串System.out.println("将日志信息保存数据库");}};channel.basicConsume(queue2Name,true,consumer);//关闭资源?不要,因为消费者要监听}
}

运行结果:

在这里插入图片描述

在这里插入图片描述

(3)小结

  1. 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到

  2. 发布订阅模式与工作队列模式的区别:

    工作队列模式不用定义交换机而发布/订阅模式需要定义交换机

    发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)

    发布/订阅模式需要设置队列和交换机的绑定工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机

1.4 Routing 路由模式

(1)模式说明

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息

图解:

在这里插入图片描述

  • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
  • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息

(2)代码编写

Producer_Routing

package com.zwb.producer.routing;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer_Routing {public static void main(String[] args) throws IOException, TimeoutException {//1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2、设置参数//factory.setHost("192.168.1.100");   //默认localhostfactory.setPort(5672);     //默认5672factory.setVirtualHost("/itcast"); //设置RabbitMQ服务器中的虚拟机,类似mysql数据库中的某一个库factory.setUsername("heima");factory.setPassword("heima");//3、创建连接ConnectionConnection connection = factory.newConnection();//4、创建ChannelChannel channel = connection.createChannel();//5、创建交换机/*exchangeDeclare(String exchange,  交换机名称BuiltinExchangeType type,  交换机类型 枚举类型DIRECT("direct"), :定向FANOUT("fanout"), :扇形(广播),发送消息到每一个与之绑定的队列TOPIC("topic"),:通配符方式HEADERS("headers");参数匹配boolean durable, 是否持久化boolean autoDelete, 是否自动删除boolean internal, 内部使用一般falseMap<String, Object> arguments) 参数列表*/String exchangeName = "test_direct";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,null);//6、创建两个队列String queue1Name="test_direct_queue1";String queue2Name="test_direct_queue2";/*String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)1.queue 队列名称2.durable 是否持久化,当mq重启之后,还在3.exclusive:*是否独占:只能有一个消费者监听队列*当connection关闭时,是否删除队列4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉5.arguments: 参数*/channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);//7、绑定队列和交换机/*queueBind(String queue, String exchange, String routingKey)参数:1.queue: 队列名称2.exchange:交换机3.routingKey:路由键:绑定规则如果交换机的类型为fanout,routingKey设置为""*///队列1channel.queueBind(queue1Name,exchangeName,"error");//队列2channel.queueBind(queue2Name,exchangeName,"info");channel.queueBind(queue2Name,exchangeName,"warning");channel.queueBind(queue2Name,exchangeName,"error");//8.发送消息String body="日志信息:张三调用了delete方法出错误了...日志级别:error";channel.basicPublish(exchangeName,"error",null,body.getBytes());//9.释放资源channel.close();connection.close();}
}

Consumer_Routing1

package com.zwb.consumer.routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Routing1 {public static void main(String[] args) throws IOException, TimeoutException {//1、创建工厂ConnectionFactory factory = new ConnectionFactory();//2、设置参数//factory.setHost("192.168.1.100"); //默认localhostfactory.setPort(5672);//默认5672factory.setVirtualHost("/itcast");// 默认/factory.setUsername("heima");// heimafactory.setPassword("heima");// heima//3、创建连接ConnectionConnection connection = factory.newConnection();//4、创建ChannelChannel channel = connection.createChannel();//5.队列已经声明一次了,这里不用再声明队列String queue1Name="test_direct_queue1";String queue2Name="test_direct_queue2";//6.接收消息/*basicConsume(String queue, boolean autoAck, Consumer callback)参数:1. queue:队列名称2. autoAck: 是否自动确认3. callback: 回调对象*/Consumer consumer=new DefaultConsumer(channel){/*回调方法,当收到消息后,会自动执行该方法1.consumerTag:标识2.envelope:获取一些信息。交换机,路由key...3.properties: 配置信息4.body: 数据*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/* System.out.println("consumerTag:"+consumerTag);System.out.println("exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);*/System.out.println("body:"+new String(body));//byte数组转string字符串System.out.println("将日志信息打印到控制台");}};channel.basicConsume(queue2Name,true,consumer);//关闭资源?不要,因为消费者要监听}
}

Consumer_Routing2

package com.zwb.consumer.routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Routing2 {public static void main(String[] args) throws IOException, TimeoutException {//1、创建工厂ConnectionFactory factory = new ConnectionFactory();//2、设置参数//factory.setHost("192.168.1.100"); //默认localhostfactory.setPort(5672);//默认5672factory.setVirtualHost("/itcast");// 默认/factory.setUsername("heima");// heimafactory.setPassword("heima");// heima//3、创建连接ConnectionConnection connection = factory.newConnection();//4、创建ChannelChannel channel = connection.createChannel();//5.队列已经声明一次了,这里不用再声明队列String queue1Name="test_direct_queue1";String queue2Name="test_direct_queue2";//6.接收消息/*basicConsume(String queue, boolean autoAck, Consumer callback)参数:1. queue:队列名称2. autoAck: 是否自动确认3. callback: 回调对象*/Consumer consumer=new DefaultConsumer(channel){/*回调方法,当收到消息后,会自动执行该方法1.consumerTag:标识2.envelope:获取一些信息。交换机,路由key...3.properties: 配置信息4.body: 数据*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/* System.out.println("consumerTag:"+consumerTag);System.out.println("exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);*/System.out.println("body:"+new String(body));//byte数组转string字符串System.out.println("将日志信息打印到控制台");}};channel.basicConsume(queue1Name,true,consumer);//关闭资源?不要,因为消费者要监听}}

(3) 运行结果

在这里插入图片描述

小结

  • Routing 模式:要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列
1.5 Topics 通配符模式

(1)模式说明

  • Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic
    类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
  • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
  • 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者item.insert,item.* 只能匹配 item.insert

在这里插入图片描述

图解:

  • 红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
  • 黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配
  • *代表一个单词,#代表0个或者多个单词

(2)代码实现

Producer_Topics

package com.zwb.producer.topics;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer_Topics {public static void main(String[] args) throws IOException, TimeoutException {//1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2、设置参数//factory.setHost("192.168.1.100");   //默认localhostfactory.setPort(5672);     //默认5672factory.setVirtualHost("/itcast"); //设置RabbitMQ服务器中的虚拟机,类似mysql数据库中的某一个库factory.setUsername("heima");factory.setPassword("heima");//3、创建连接ConnectionConnection connection = factory.newConnection();//4、创建ChannelChannel channel = connection.createChannel();//5、创建交换机/*exchangeDeclare(String exchange,  交换机名称BuiltinExchangeType type,  交换机类型 枚举类型DIRECT("direct"), :定向FANOUT("fanout"), :扇形(广播),发送消息到每一个与之绑定的队列TOPIC("topic"),:通配符方式HEADERS("headers");参数匹配boolean durable, 是否持久化boolean autoDelete, 是否自动删除boolean internal, 内部使用一般falseMap<String, Object> arguments) 参数列表*/String exchangeName = "test_topic";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,null);//6、创建两个队列String queue1Name="test_topic_queue1";String queue2Name="test_topic_queue2";/*String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)1.queue 队列名称2.durable 是否持久化,当mq重启之后,还在3.exclusive:*是否独占:只能有一个消费者监听队列*当connection关闭时,是否删除队列4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉5.arguments: 参数*/channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);//7、绑定队列和交换机/*queueBind(String queue, String exchange, String routingKey)参数:1.queue: 队列名称2.exchange:交换机3.routingKey:路由键:绑定规则如果交换机的类型为fanout,routingKey设置为""*///routing key:系统的名称,日志级别//--需求:所有error级别的日志存入数据库,所有order系统的日志存入数据库channel.queueBind(queue1Name,exchangeName,"#.error");channel.queueBind(queue1Name,exchangeName,"order.*");channel.queueBind(queue2Name,exchangeName,"*.*");//8.发送消息String body="日志信息:张三调用了findAll方法...日志级别:info";channel.basicPublish(exchangeName,"order.info",null,body.getBytes());//9.释放资源channel.close();connection.close();}
}

Consumer_Topic1

package com.zwb.consumer.topics;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Topic1 {public static void main(String[] args) throws IOException, TimeoutException {//1、创建工厂ConnectionFactory factory = new ConnectionFactory();//2、设置参数//factory.setHost("192.168.1.100"); //默认localhostfactory.setPort(5672);//默认5672factory.setVirtualHost("/itcast");// 默认/factory.setUsername("heima");// heimafactory.setPassword("heima");// heima//3、创建连接ConnectionConnection connection = factory.newConnection();//4、创建ChannelChannel channel = connection.createChannel();//5.队列已经声明一次了,这里不用再声明队列String queue1Name="test_topic_queue1";String queue2Name="test_topic_queue2";//6.接收消息/*basicConsume(String queue, boolean autoAck, Consumer callback)参数:1. queue:队列名称2. autoAck: 是否自动确认3. callback: 回调对象*/Consumer consumer=new DefaultConsumer(channel){/*回调方法,当收到消息后,会自动执行该方法1.consumerTag:标识2.envelope:获取一些信息。交换机,路由key...3.properties: 配置信息4.body: 数据*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/* System.out.println("consumerTag:"+consumerTag);System.out.println("exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);*/System.out.println("body:"+new String(body));//byte数组转string字符串System.out.println("将日志信息打印到控制台");}};channel.basicConsume(queue2Name,true,consumer);//关闭资源?不要,因为消费者要监听}}

Consumer_Topic2

package com.zwb.consumer.topics;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Topic2 {public static void main(String[] args) throws IOException, TimeoutException {//1、创建工厂ConnectionFactory factory = new ConnectionFactory();//2、设置参数//factory.setHost("192.168.1.100"); //默认localhostfactory.setPort(5672);//默认5672factory.setVirtualHost("/itcast");// 默认/factory.setUsername("heima");// heimafactory.setPassword("heima");// heima//3、创建连接ConnectionConnection connection = factory.newConnection();//4、创建ChannelChannel channel = connection.createChannel();//5.队列已经声明一次了,这里不用再声明队列String queue1Name="test_topic_queue1";String queue2Name="test_topic_queue2";//6.接收消息/*basicConsume(String queue, boolean autoAck, Consumer callback)参数:1. queue:队列名称2. autoAck: 是否自动确认3. callback: 回调对象*/Consumer consumer=new DefaultConsumer(channel){/*回调方法,当收到消息后,会自动执行该方法1.consumerTag:标识2.envelope:获取一些信息。交换机,路由key...3.properties: 配置信息4.body: 数据*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/* System.out.println("consumerTag:"+consumerTag);System.out.println("exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);*/System.out.println("body:"+new String(body));//byte数组转string字符串System.out.println("将日志信息打印到控制台");}};channel.basicConsume(queue1Name,true,consumer);//关闭资源?不要,因为消费者要监听}
}

结果

在这里插入图片描述

小结

  • Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。

2、工作模式总结

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

3、SpringBoot 整合RabbitMQ

3.1 生产者
  1. 创建生产者SpringBoot工程

  2. 引入start,依赖坐标

    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  3. 编写yml配置,基本信息配置

  4. 定义交换机,队列以及绑定关系的配置类

  5. 注入RabbitTemplate,调用方法,完成消息发送

    在这里插入图片描述

    pom.xml

        <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.15</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.rabiitmq</groupId><artifactId>producer-springboot</artifactId><version>0.0.1-SNAPSHOT</version><name>producer-springboot</name><description>producer-springboot</description><properties><java.version>8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.1</version><scope>test</scope></dependency></dependencies>
    

    RabbitMQConfig.java

    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration
    public class RabbitMQConfig {public static final String EXCHANGE_NAME="boot_topic_exchange";public static final String QUEUE_NAME="boot_quque";/**  参数说明:*  id: bean的名称*  name:quque名称*  auto-declare:自动创建*  auto-delete: 自动删除,最后一个消费者和该队列断开连接后,自动删除队列*  durable:是否持久化*  exclusive:是否独占*///1.交换机@Bean("bootExchange")public Exchange bootExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//2.队列@Bean("bootQueue")public Queue bootQueue(){return QueueBuilder.durable(QUEUE_NAME).build();}//3.队列和交换机的绑定关系@Beanpublic Binding binQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){return  BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();}
    }
    

ProducerSpringbootApplication

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ProducerSpringbootApplication {public static void main(String[] args) {SpringApplication.run(ProducerSpringbootApplication.class, args);}}

application.yml

# 配置rabbitmq的基本信息 IP端口 username passwordspring:
spring:rabbitmq:host: localhostport: 5672virtual-host: /itcastusername: heimapassword: heima

ProducerTest

import com.rabiitmq.config.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class ProducerSpringbootApplicationTests {//1.注入rabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSend(){rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","这是springboot-rabbitmq的测试消息22");}
}

测试:运行Junit Test

在这里插入图片描述

3.2 消费者
  1. 创建消费者SpringBoot工程

  2. 引入start,依赖坐标

    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  3. 编写yml配置,基本信息配置

  4. 定义监听类,使用@RabbitListener注解完成队列监听。

在这里插入图片描述

ConsumerSpringbootApplication

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ConsumerSpringbootApplication {public static void main(String[] args) {SpringApplication.run(ConsumerSpringbootApplication.class, args);}}

RabbitMQListener.java

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class RabbitMQListener {@RabbitListener(queues = "boot_quque")public void ListenerQueue(Message message){System.out.println(new String(message.getBody()));}
}

application.yml

# 配置rabbitmq的基本信息 IP端口 username passwordspring:
spring:rabbitmq:host: localhostport: 5672virtual-host: /itcastusername: heimapassword: heima

pom.xml

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.15</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.rabbitmq</groupId>
<artifactId>consumer-springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>consumer-springboot</name>
<description>consumer-springboot</description>
<properties><java.version>8</java.version>
</properties>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.1</version><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.1</version><scope>test</scope></dependency>
</dependencies>

运行ConsumerSpringbootApplication,消费者从MQ中拿到消息

在这里插入图片描述

3.3 小结
  • SpringBoot提供了快速整合RabbitMQ的方式
  • 基本信息在yml中配置,队列交互机以及绑定关系在配置类中使用Bean的方式配置
  • 生产端直接注入RabbitTemplate完成消息发送
  • 消费端直接使用@RabbitListener完成消息接收

三、常见问题

1、Erlang和RabbitMQ版本不匹配

https://blog.csdn.net/weixin_62319133/article/details/127925299

​ 在安装RabbitMQ是会出现无法读取ErlangOTP的情况,多数是因为版本不匹配

​ 1.在官方文档中找到RabbitMQ版本对应的Erlang版本重新下载安装包

​ 文档RabbitMQ Erlang Version Requirements — RabbitMQ

​ 2.下载完成后停止Erlang和RabbitMQ服务并进入原本Erlang和RabbitMQ的安装目录,运行Uninstall.exe

​ 3.卸载完成后检查Erlang和RabbitMQ的安装目录是否删干净

​ 4.删除RabbitMQ和Erlang的所有安装目录。

​ 5.运行CMD–>sc delete RabbitMQ

​ 6.删除目录C:\Windows\System32\config\systemprofile中的.erlang.cookie文件(如果有的话)。

​ 7.删除目录C:\Users\用户名 中的.erlang.cookie文件(如果有的话)。

​ 8.删除目录C:\Users\用户名\AppData\Roaming目录下的RabbitMQ文件夹。

​ 9.打开注册表编辑器,删除表

计算机\HKEY_LOCAL_MACHINE\SOFTWARE\WOW6432Node\Ericsson\Erlang下的子项

​ 10.以管理员运行分别运行Erlang和RabbitMQ安装包即可

2、找不到ERLANG_HOME

rabbitmq启动的时候报错:Please either set ERLANG_HOME to point to your Erlang installation or place

https://blog.csdn.net/weixin_42409107/article/details/100105905

3、RabbitMQ和Erlang的版本对应关系

https://www.rabbitmq.com/which-erlang.html

在这里插入图片描述

相关文章:

RabbitMQ实战学习

RabbitMQ实战学习 文章目录 RabbitMQ实战学习RabbitMQ常用资料1、安装教程2、使用安装包3、常用命令4、验证访问5、代码示例 一、RabbitMQ基本概念1.1. MQ概述1.2 MQ 的优势和劣势1.3 MQ 的优势1. 应用解耦2. 异步提速3. 削峰填谷 1.4 MQ 的劣势1.5 RabbitMQ 基础架构1.6 JMS 二…...

插混、油混、增程式、轻混、强混,啥区别

这里写自定义目录标题 随着我国新能源汽车的大力推进&#xff0c;电车可以说是世界未来的主流&#xff0c;只不过现在是处在一个过渡时代 这是个好时代&#xff0c;因为我们见证并体验着历史过渡的细节 这是个不好的时代&#xff0c;因为我们可能只是未来新新人类的试验品 帮他…...

React 模态框的设计(八)优化补充

在之前的弹窗的设计中&#xff0c;有两处地方现在做一点小小的优化&#xff0c;就是把_Draggable.jsx中的 onPointerEnter 事件 用 useLayoutEffect来规换&#xff0c;效果更佳&#xff0c;同样的&#xff0c;在_ModelContainer.jsx中也是一样。如下所示&#xff1a; _Draggabl…...

知识积累(三):深度学习相关概念(查看检索时看到)

文章目录 1. 知识蒸馏2. 可微搜索索引&#xff08;DSI&#xff09;参考资料 在找论文时&#xff0c;发现的相关概念。 1. 知识蒸馏 知识蒸馏&#xff08;knowledge distillation&#xff09;是模型压缩的一种常用的方法&#xff0c;不同于模型压缩中的剪枝和量化&#xff0c;知…...

计算机专业必看的几部电影

目录 ​编辑 1. 《第九区》&#xff08;District 9&#xff0c;2009&#xff09; 2. 《谍影重重》&#xff08;The Bourne Identity&#xff0c;2002&#xff09; 3. 《源代码》&#xff08;Source Code&#xff0c;2011&#xff09; 4. 《她》&#xff08;Her&#xff0c;…...

工业人工智能需要注意的10件事

我们无法逃避人工智能这个风口&#xff0c;宣传人工智能软件的广告铺天盖地&#xff0c;似乎每个供应商都在推出最新的工具包&#xff0c;每天都有关于 ChatGPT、Bard 等新用例的文章。似乎全世界都在说&#xff1a;你现在需要人工智能&#xff01; 人工智能确实正在成为自动化…...

软考-系统集成项目管理中级-信息系统建设与设计

本章重点考点 1.信息系统的生命周期 信息系统建设的内容主要包括设备采购、系统集成、软件开发和运维服务等。信息系统的生命周期可以分为四个阶段:立项、开发、运维和消亡。 2.信息系统开发方法 信息系统常用的开发方法有结构化方法、原型法、面向对象方法等 1)结构化方法 …...

C++从零开始的打怪升级之路(day39)

这是关于一个普通双非本科大一学生的C的学习记录贴 在此前&#xff0c;我学了一点点C语言还有简单的数据结构&#xff0c;如果有小伙伴想和我一起学习的&#xff0c;可以私信我交流分享学习资料 那么开启正题 今天分享的是关于模板的知识点 1.非类型模板参数 模板参数分为…...

Java面试题之并发

并发 1.并发编程的优缺点?2.并发编程三要素?3.什么叫指令重排?4.如何避免指令重排?5.并发?并行?串行?6.线程和进程的概念和区别?7.什么是上下文切换?8.守护线程和用户线程的定义?9.什么是线程死锁?10.形成死锁的四个条件?11.怎么避免死锁?12.创建线程的四种方式?…...

Python GUI自动化定位代码参考

一、pyautogui原始逻辑 import pyautogui # 获取指定图片在屏幕上的位置 image_path path/to/image.png target_position pyautogui.locateCenterOnScreen(image_path) if target_position is not None: # 获取偏移量 offset_x 10 offset_y 10 # 计算实际点…...

11.网络游戏逆向分析与漏洞攻防-游戏网络架构逆向分析-接管游戏接收网络数据包的操作

内容参考于&#xff1a;易道云信息技术研究院VIP课 上一个内容&#xff1a;接管游戏发送数据的操作 码云地址&#xff08;master 分支&#xff09;&#xff1a;https://gitee.com/dye_your_fingers/titan 码云版本号&#xff1a;8256eb53e8c16281bc1a29cb8d26d352bb5bbf4c 代…...

特斯拉一面算法原题

来自太空的 X 帖子 埃隆马斯克&#xff08;Elon Musk&#xff09;旗下太空探索技术公司 SpaceX 于 2 月 26 号&#xff0c;从太空往社交平台 X&#xff08;前身为推特&#xff0c;已被马斯克全资收购并改名&#xff09;发布帖子。 这是 SpaceX 官号首次通过星链来发送 X 帖子&a…...

【Leetcode每日一题】二分查找 - 山脉数组的峰顶索引(难度⭐⭐)(23)

1. 题目解析 Leetcode链接&#xff1a;852. 山脉数组的峰顶索引 这个问题的理解其实相当简单&#xff0c;只需看一下示例&#xff0c;基本就能明白其含义了。 核心在于找到题目中所说的峰值所在的下标并返回他们的下标即可。 2. 算法原理 峰顶及两侧数据特点分析 峰顶数据…...

Linux添加用户分组练习

一、复制/etc/skel目录为/home/tuser1&#xff08;/home/tuser1及其内部文件的属组和其它用户均没有任何访问权限&#xff09;。 cp -a /etc/skel /home/tuser1 chown -R tuser1:tuser1 /home/tuser1 chmod -R 700 /home/tuser1 二、编辑/etc/group文件&#xff0c;添加组h…...

云快充充电桩系统设计书

充电桩系统设计书 一、系统设计概述 随着新能源汽车市场的快速发展&#xff0c;充电桩作为电动汽车的重要配套设施&#xff0c;其市场需求日益增长。本系统旨在提供一套稳定、高效、易用的充电桩解决方案&#xff0c;以满足市场上新能源充电桩的主流需求。通过实现云快充V1.6协…...

oracle DG 原理

在Oracle中&#xff0c;什么是DG&#xff1f;DG有哪些优缺点&#xff1f; DG&#xff08;Data Guard&#xff0c;数据卫士&#xff09;不是一个备份恢复的工具&#xff0c;然而&#xff0c;DG却拥有备份的功能&#xff0c;在物理DG下它可以和主库一模一样&#xff0c;但是它存…...

MySQL篇—持久化和非持久化统计信息介绍(第一篇,总共三篇)

☘️博主介绍☘️&#xff1a; ✨又是一天没白过&#xff0c;我是奈斯&#xff0c;DBA一名✨ ✌✌️擅长Oracle、MySQL、SQLserver、Linux&#xff0c;也在积极的扩展IT方向的其他知识面✌✌️ ❣️❣️❣️大佬们都喜欢静静的看文章&#xff0c;并且也会默默的点赞收藏加关注❣…...

Leetcode—65. 有效数字【困难】

2024每日刷题&#xff08;118&#xff09; Leetcode—65. 有效数字 实现代码 class Solution { public:bool isNumber(string s) {if(s.empty()) {return false;}bool seenNum false;bool seenE false;bool seenDot false;for(int i 0; i < s.size(); i) {switch(s[i]…...

【Java程序设计】【C00322】基于Springboot的高校竞赛管理系统(有论文)

基于Springboot的高校竞赛管理系统&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的高校竞赛管理系统&#xff0c;本系统有管理员、老师、专家以及用户四种角色&#xff1b; 管理员&#xff1a;首页、个人中心、管…...

41、网络编程/TCP.UDP通信模型练习20240301

一、编写基于TCP的客户端实现以下功能&#xff1a; 通过键盘按键控制机械臂&#xff1a;w(红色臂角度增大)s&#xff08;红色臂角度减小&#xff09;d&#xff08;蓝色臂角度增大&#xff09;a&#xff08;蓝色臂角度减小&#xff09;按键控制机械臂 1.基于TCP服务器的机械臂…...

linux 错误码总结

1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...

ServerTrust 并非唯一

NSURLAuthenticationMethodServerTrust 只是 authenticationMethod 的冰山一角 要理解 NSURLAuthenticationMethodServerTrust, 首先要明白它只是 authenticationMethod 的选项之一, 并非唯一 1 先厘清概念 点说明authenticationMethodURLAuthenticationChallenge.protectionS…...

【Java_EE】Spring MVC

目录 Spring Web MVC ​编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 ​编辑参数重命名 RequestParam ​编辑​编辑传递集合 RequestParam 传递JSON数据 ​编辑RequestBody ​…...

短视频矩阵系统文案创作功能开发实践,定制化开发

在短视频行业迅猛发展的当下&#xff0c;企业和个人创作者为了扩大影响力、提升传播效果&#xff0c;纷纷采用短视频矩阵运营策略&#xff0c;同时管理多个平台、多个账号的内容发布。然而&#xff0c;频繁的文案创作需求让运营者疲于应对&#xff0c;如何高效产出高质量文案成…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

Golang——9、反射和文件操作

反射和文件操作 1、反射1.1、reflect.TypeOf()获取任意值的类型对象1.2、reflect.ValueOf()1.3、结构体反射 2、文件操作2.1、os.Open()打开文件2.2、方式一&#xff1a;使用Read()读取文件2.3、方式二&#xff1a;bufio读取文件2.4、方式三&#xff1a;os.ReadFile读取2.5、写…...

探索Selenium:自动化测试的神奇钥匙

目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...

【从零开始学习JVM | 第四篇】类加载器和双亲委派机制(高频面试题)

前言&#xff1a; 双亲委派机制对于面试这块来说非常重要&#xff0c;在实际开发中也是经常遇见需要打破双亲委派的需求&#xff0c;今天我们一起来探索一下什么是双亲委派机制&#xff0c;在此之前我们先介绍一下类的加载器。 目录 ​编辑 前言&#xff1a; 类加载器 1. …...

DAY 26 函数专题1

函数定义与参数知识点回顾&#xff1a;1. 函数的定义2. 变量作用域&#xff1a;局部变量和全局变量3. 函数的参数类型&#xff1a;位置参数、默认参数、不定参数4. 传递参数的手段&#xff1a;关键词参数5 题目1&#xff1a;计算圆的面积 任务&#xff1a; 编写一…...

大模型——基于Docker+DeepSeek+Dify :搭建企业级本地私有化知识库超详细教程

基于Docker+DeepSeek+Dify :搭建企业级本地私有化知识库超详细教程 下载安装Docker Docker官网:https://www.docker.com/ 自定义Docker安装路径 Docker默认安装在C盘,大小大概2.9G,做这行最忌讳的就是安装软件全装C盘,所以我调整了下安装路径。 新建安装目录:E:\MyS…...