rabbitmq-java基础详解
一、rabbitmq是什么?
1、MQ定义
MQ(Message Queue)消息队列 主要解决:异步处理、应用解耦、流量削峰等问题,是分布式系统的重要组件,从而实现高性能,高可用,可伸缩和最终一致性的架构,rabbitmq 是 消息队列中的一种。
1.1 异步
通过消息队列,生产者无需等待消费者完成处理即可继续执行其他任务,从而提高系统响应速度和吞吐量。例如,在用户下单后,订单系统可以将订单信息发送到消息队列,然后立即返回给用户确认信息,而物流系统或库存系统则在后台异步地从队列中获取并处理订单。

1.2 解耦
不同应用程序之间通过消息队列通信,不再直接依赖对方的接口调用,当某一方进行升级或重构时,不会影响其他系统的运行。例如,一个支付系统可以向消息队列发布支付成功的通知,而积分系统、仓库系统等分别订阅这些消息来更新各自的业务状态,彼此独立工作。

1.3 削峰
当短时间内有大量的请求涌入系统时,消息队列可以作为缓冲区存储这些请求,以恒定的速度分发给下游服务,避免了因为瞬间高峰导致的服务崩溃。

2、技术背景
2.1 AMQP高级消息队列协议
Advanced Message Queuing Protocol 是一个开放标准的消息中间件协议,它定义了消息代理和应用程序之间的交互方式。RabbitMQ即是基于AMQP协议实现的消息队列产品,提供了一种标准化的方式来保证跨语言和平台的可靠消息传输。
2.2 JMS
-
Java Message Server,Java消息服务应用程序接口,一种规范,和JDBC担任的角色类似
-
是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消 息,进行异步通信
2.3 联系
-
JMS是定义了统一接口,统一消息操作;AMQP通过协议统一数据交互格式
-
JMS必须是java语言;AMQP只是协议,与语言无关
2.4 Erlang语言
RabbitMQ服务器端是使用Erlang语言编写的,Erlang以其高并发、容错性和分布式计算能力闻名,非常适合用于构建像RabbitMQ这样需要高度稳定和可扩展的消息中间件。
3、为什么使用rabbitmq
- 可靠性:RabbitMQ提供了多种机制保证消息投递的可靠性,包括持久化消息、消息确认机制等。
- 灵活性:通过Exchange、Queue和Routing Key等组件,RabbitMQ支持灵活的消息路由策略,包括发布订阅、路由模式、主题模式等多种模式。
- 扩展性:通过集群和镜像队列等功能,RabbitMQ可以轻松实现水平扩展,满足高可用及高性能的需求。
- 广泛支持:RabbitMQ拥有丰富的客户端库,几乎支持所有主流开发语言,便于开发者快速集成。
- 使用简单方便:安装部署简单,上手门槛低,有强大的WEB管理页面。
4、rabbitmq的各组件功能

- Broker:消息队列服务器实体
- Virtual Host:虚拟主机,是一种逻辑隔离单位,可以在单个RabbitMQ Broker实例上创建多个vhost,每个vhost内部有自己的交换机、队列和权限管理,实现不同项目或租户间资源的隔离。
- Publisher(生产者):负责生成和发布消息到RabbitMQ服务器,可以选择目标交换机并将消息附带特定的路由键。
- Consumer(消费者):从RabbitMQ中接收并消费消息的程序,可以从绑定到特定交换机和路由键的队列中取出消息进行处理。
- Exchange(交换机):根据预定义的类型和路由规则,接收生产者发布的消息,并将其转发到相应的队列。常见的交换机类型有直连(Direct)、主题(Topic)、头部(Headers)和扇出(Fanout)等。
- Queue(队列):存储消息的容器,实际的消息载体,消息会按照路由规则存放在队列中等待消费者消费。
- Banding:绑定,用于消息队列和交换机之间的关联。
- Channel:通道(信道)
- 多路复用连接中的一条独立的双向数据流通道。
- 信道是建立在真实的TCP连接内的 虚拟链接。
- AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,都是通过信道完成的。
- 因为对于操作系统来说,建立和销毁TCP连接都是非常昂贵的开销,所以引入了信道的概念,用来复用TCP连接。
- Routing key:生产者在发布消息时指定的一个标识符,用于决定消息如何被交换机路由到相应的队列。
二、rabbitmq 的使用
1、Linux虚拟机设置
-
rabbitmq的安装通常涉及到如下两个步骤,可以参考博文Linux安装RabbitMQ详细教程(最详细的图文教程)-CSDN博客:
- 安装Erlang:由于RabbitMQ是用Erlang编写的,首先需要在Linux系统中安装Erlang运行环境。
- 安装RabbitMQ:可以通过官方提供的apt或yum仓库进行安装,或者下载源码自行编译安装。
- 安装Erlang:由于RabbitMQ是用Erlang编写的,首先需要在Linux系统中安装Erlang运行环境。
-
启动后台管理插件
[root@localhost opt]# rabbitmq-plugins enable rabbitmq_management
-
启动、查看状态、重启、关闭 rabbitmq
[root@localhost opt]# systemctl start rabbitmq-server.service[root@localhost opt]# systemctl status rabbitmq-server.service[root@localhost opt]# systemctl restart rabbitmq-server.service[root@localhost opt]# systemctl stop rabbitmq-server.service -
查看进程
[root@localhost opt]# ps -ef | grep rabbitmq -
测试
-
关闭防火墙: systemctl stop firewalld
-
浏览器输入:http://ip:15672
-
默认帐号密码:guest,guest用户默认不允许远程连接
-
创建账号
[root@localhost opt]# rabbitmqctl add_user 你的用户名 你的密码 -
设置用户角色
[root@localhost opt]# rabbitmqctl set_user_tags 你的用户名 administrator -
设置用户权限
[root@localhost opt]# rabbitmqctl set_permissions -p "/" 你的用户名 ".*" ".*" ".*" -
查看当前用户和角色
[root@localhost opt]# rabbitmqctl list_users -
修改用户密码
[root@localhost opt]# rabbitmqctl change_password 你的用户名 新的密码
-
-
web界面介绍:

端口:
5672:RabbitMQ提供给编程语言客户端链接的端口
15672:RabbitMQ管理界面的端口
25672:RabbitMQ集群的端口
2、java使用rabbitmq
2.1 快速入门
-
远程登录创建的账号,在Admin下添加了用户

-
pom依赖
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version><scope>compile</scope></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.9</version></dependency></dependencies> -
创建连接工具类
package utils;import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {public static Connection getConnection() throws Exception{//1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2、在工厂对象中设置MQ的连接信息(ip、port、vhost、username、password)factory.setHost("192.168.81.121");factory.setPort(5672);factory.setVirtualHost("/lb");factory.setUsername("lb");factory.setPassword("123123");//3、通过工厂获得与MQ的连接Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws Exception {Connection connection = getConnection();System.out.println("connection = " + connection);connection.close();} } -
运行测试结果

2.2 RabbitMQ模式
5种消息模型,大体分为两类:点对点、发布订阅模式(一对多)
- 点对点:
- 包含三部分:消息队列(queue),发送者(sender),接收者(receiver)
- 每个消息发送到一个特定的队列中,接收者从中获得消息,队列中保留这些消息,直到他们被消费或超时
- 每个消息一个消费者
- 消费者不需要运行,发送者发送的消息可以被直接保存在队列内
- 简单模式和工作队列模式属于这种类型
- 发布订阅:
- 发布订阅多了一部分,交换机,起到将消息路由分发到各个订阅者的作用
- 每个消息可以有多个订阅者
- 消费者需要先订阅,订阅者发布的消息才能被消费
- 消费者需要保持运行状态,才能消费消息
- 发布订阅模式、路由模式、通配符(主题)模式属于这种类型
2.2.1 简单模式

-
生产者
package simplest;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import utils.ConnectionUtil;public class Sender {public static void main(String[] args) throws Exception {String msg = "Hello world!";//1、获取连接Connection connection = ConnectionUtil.getConnection();//2、在连接中创建通道(信道)Channel channel = connection.createChannel();/*3、创建消息队列参数1、队列中的名称参数2、队列的数据是否持久化参数3、是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)参数4、是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存保存数据)参数5、队列参数(没有参数为null)** */channel.queueDeclare("queue1", false, false, false, null);/*4、向指定队列发送消息参数1、交换机名称,简单模式没有交换机,所以名称为""参数2、目标队列的名称参数3、设置消息的属性(没有属性为null)参数4、消息的内容(只接受字节数组)*/channel.basicPublish("", "queue1", null, msg.getBytes());System.out.println("发送:" + msg);//5、释放资源channel.close();connection.close();} } -
消费者
package simplest;import com.rabbitmq.client.*; import utils.ConnectionUtil;import java.io.IOException;public class Receiver {public static void main(String[] args) throws Exception {//1、获得连接Connection connection = ConnectionUtil.getConnection();//2、获得通道(信道)Channel channel = connection.createChannel();//3、从信道获得消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/*交付处理参数1:收件人信息参数2:包裹上的快递标签参数3:协议的配置参数4:消息*/String s = new String(body);System.out.println("接收到的消息:" + s);}};//4、监听队列 true:自动消息确认channel.basicConsume("queue1", true, consumer);} } -
测试结果
-
先运行sender,此时Queue中存入一个消息:

-
再运行receiver,此时Queue中的消息被消费:

-
-
消息确认机制
消息可以设置手动确认,这样可以保证:
- 消费者接收到消息处理时未发生异常再确认,消息才被删除;
- 发生异常,不确认,消息就不会被删除,防止消息丢失。
修改消费者代码:
package simplest;import com.rabbitmq.client.*; import utils.ConnectionUtil;import java.io.IOException;public class ReceiverByAck {public static void main(String[] args) throws Exception {//1、获得连接Connection connection = ConnectionUtil.getConnection();//2、获得通道Channel channel = connection.createChannel();//3、从channel中获取消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//body就是从队列中获取的消息String s = new String(body);System.out.println("接收消息:" + s);//手动确认(收件人信息,是否同时确认多个消息)System.out.println("消息已接收并正常处理完毕!手动确认回执!");channel.basicAck(envelope.getDeliveryTag(), false);}};//4、设置手动确认channel.basicConsume("queue1", false, consumer);} } -
再次运行sender和receiver测试结果:

2.2.2 工作队列模式

如图,此种模式区别于简单模式主要是多个消费者共同消费消息,但注意,仍然是一个消息对应一个消费者。
-
生产者代码类似简单模式,只是循环发了多条消息
package work;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import utils.ConnectionUtil;public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("test_work_queue", false, false, false, null);for (int i = 0; i < 100; i++) {String msg = "产生的消息====>>>" + i;channel.basicPublish("", "test_work_queue", null, msg.getBytes());System.out.println(msg);}channel.close();connection.close();}} -
消费者同样声明了消息队列,这样可以提前开启监听消息队列;同时,消费者1延时300ms,消费者2延时1000ms,便于观察;
-
消费者1
package work;import com.rabbitmq.client.*; import utils.ConnectionUtil;import java.io.IOException;public class Receiver1 {static int num = 1;//统计消费的消息数目public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();//queueDeclare() 此方法有双重作用,如果队列不存在,就创建;如果队列存在,则获取// 使用此方法可以保证先启动消费者不会报错channel.queueDeclare("test_work_queue", false, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("Receiver1 消费了" + s + "! 总共消费了" + num++ + "条消息!");//延迟时间try {Thread.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}//手动确认channel.basicAck(envelope.getDeliveryTag(), false);}};//设置监听channel.basicConsume("test_work_queue", false, consumer);} } -
消费者2
package work;import com.rabbitmq.client.*; import utils.ConnectionUtil;import java.io.IOException;public class Receiver2 {static int num = 1;//统计消费的消息数目public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();//queueDeclare() 此方法有双重作用,如果队列不存在,就创建;如果队列存在,则获取// 使用此方法可以保证先启动消费者不会报错channel.queueDeclare("test_work_queue", false, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("Receiver2 消费了" + s + "! 总共消费了" + num++ + "条消息!");//延迟时间try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}//手动确认channel.basicAck(envelope.getDeliveryTag(), false);}};//设置监听channel.basicConsume("test_work_queue", false, consumer);} }
-
-
测试结果:
-
先开启消费者,后台就存在了相应队列:

-
再运行生产者:

-
-
可以看到消费者1消费消息的效率高,但仍然只消费50个,说明生产者的消息是完全均匀分配的,这不符合正常的需求,我们想按照效率分配,需添加如下代码:
channel.basicQos(1);修改后的消费者代码:
-
消费者1
package work;import com.rabbitmq.client.*; import utils.ConnectionUtil;import java.io.IOException;public class Receiver1 {static int num = 1;//统计消费的消息数目public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();//queueDeclare() 此方法有双重作用,如果队列不存在,就创建;如果队列存在,则获取// 使用此方法可以保证先启动消费者不会报错channel.queueDeclare("test_work_queue", false, false, false, null);channel.basicQos(1);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("Receiver1 消费了" + s + "! 总共消费了" + num++ + "条消息!");//延迟时间try {Thread.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}//手动确认channel.basicAck(envelope.getDeliveryTag(), false);}};//设置监听channel.basicConsume("test_work_queue", false, consumer);} } -
消费者2
package work;import com.rabbitmq.client.*; import utils.ConnectionUtil;import java.io.IOException;public class Receiver2 {static int num = 1;//统计消费的消息数目public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();//queueDeclare() 此方法有双重作用,如果队列不存在,就创建;如果队列存在,则获取// 使用此方法可以保证先启动消费者不会报错channel.queueDeclare("test_work_queue", false, false, false, null);channel.basicQos(1);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("Receiver2 消费了" + s + "! 总共消费了" + num++ + "条消息!");//延迟时间try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}//手动确认channel.basicAck(envelope.getDeliveryTag(), false);}};//设置监听channel.basicConsume("test_work_queue", false, consumer);} }
-
-
先开启消费者,再运行生产者,测试结果:

注意:能者多劳必须要配合手动的ACK机制才生效
2.2.3 发布订阅模式

发布订阅模式添加了 X(交换机 Exchange),该角色主要实现消息的分发,当多个消息队列绑定了该交换机时,该交换机会把消息广播到所有绑定到它的队列,所以所有订阅了相应队列的消费者都会收到相同的消息。
-
生产者
package fanout;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import utils.ConnectionUtil;public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明路由(路由名,路由类型)// fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到与该路由绑定的所有队列上)channel.exchangeDeclare("test_fanout_exchange", "fanout");String msg = "hello,everyone!";channel.basicPublish("test_fanout_exchange", "", null, msg.getBytes());System.out.println("生产者:" + msg);channel.close();connection.close();} } -
消费者
-
消费者1
package fanout;import com.rabbitmq.client.*; import utils.ConnectionUtil;import java.io.IOException;public class Receiver1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//声明队列channel.queueDeclare("test_fanout_queue1", false, false, false, null);//绑定路由channel.queueBind("test_fanout_queue1", "test_fanout_exchange", "");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("消费者1: " + s);channel.basicAck(envelope.getDeliveryTag(), false);}};//监听队列channel.basicConsume("test_fanout_queue1", false, consumer);} } -
消费者2
package fanout;import com.rabbitmq.client.*; import utils.ConnectionUtil;import java.io.IOException;public class Receiver2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//声明队列channel.queueDeclare("test_fanout_queue2", false, false, false, null);//绑定路由channel.queueBind("test_fanout_queue2", "test_fanout_exchange", "");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("消费者2: " + s);channel.basicAck(envelope.getDeliveryTag(), false);}};//监听队列channel.basicConsume("test_fanout_queue2", false, consumer);} }
-
-
测试:
-
先运行生产者,因为需要先创建交换机,此步的消息忽略。

-
再运行消费者,消费者的队列绑定交换机。
-
最后运行生产者。

-
2.2.4 路由模式

路由模式可以定向分发消息给不同的队列,区别于发布订阅模式,主要是由 路由key区分了消息的种类,根据不同的消息种类分别分发给对应的消息队列。
-
生产者,发布消息时需要声明绑定哪种key
package direct;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import utils.ConnectionUtil;public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//声明路由// direct:根据路由键进行定向分发消息channel.exchangeDeclare("test_direct_exchange", "direct");String msg = "用户注册,【userid=100】";channel.basicPublish("test_direct_exchange", "insert", null, msg.getBytes());msg = "用户查询,【userid=200】";channel.basicPublish("test_direct_exchange", "select", null, msg.getBytes());channel.close();connection.close();} } -
消费者,一个绑定增删改的路由key,另一个绑定查询的路由key
-
消费者1
package direct;import com.rabbitmq.client.*; import utils.ConnectionUtil;import java.io.IOException;public class Receiver1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//声明队列channel.queueDeclare("test_direct_queue1", false, false, false, null);//队列绑定channel.queueBind("test_direct_queue1", "test_direct_exchange", "insert");channel.queueBind("test_direct_queue1", "test_direct_exchange", "delete");channel.queueBind("test_direct_queue1", "test_direct_exchange", "update");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("消费者1:" + s);channel.basicAck(envelope.getDeliveryTag(), false);}};//监听队列channel.basicConsume("test_direct_queue1", false, consumer);} } -
消费者2
package direct;import com.rabbitmq.client.*; import utils.ConnectionUtil;import java.io.IOException;public class Receiver2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//声明队列channel.queueDeclare("test_direct_queue2", false, false, false, null);//队列绑定channel.queueBind("test_direct_queue2", "test_direct_exchange", "select");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("消费者2:" + s);channel.basicAck(envelope.getDeliveryTag(), false);}};//监听队列channel.basicConsume("test_direct_queue2", false, consumer);} }
-
-
测试:
-
先运行生产者,因为需要先创建交换机,此步的消息忽略。

-
再运行消费者,消费者的队列绑定交换机。

-
最后运行生产者。

-
注意:是 队列和路由键 进行绑定,当队列绑定了路由键,消费者再监听该队列时,所有的队列信息都能拿到。通常,每个消费者 只监听自己的消费队列。
2.2.5 通配符模式

通配符模式 和 路由模式的区别:
- 路由键支持模糊匹配
匹配符号:
-
*:只能匹配一个词(正好一个词,多一个不行,少一个也不行)
-
#:匹配0个或更多个词
-
生产者
package topic;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import utils.ConnectionUtil;public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("test_topic_exchange", "topic");String msg = "orange_rabbit";channel.basicPublish("test_topic_exchange", "orange.rabbit", null, msg.getBytes() );msg = "beautiful_smart_fox";channel.basicPublish("test_topic_exchange", "beautiful.smart.fox#.fox", null, msg.getBytes() );channel.close();connection.close();}} -
消费者
-
消费者1
package topic;import com.rabbitmq.client.*; import utils.ConnectionUtil;import java.io.IOException;public class Receiver1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//声明队列channel.queueDeclare("test_topic_queue1", false, false, false, null);//绑定队列channel.queueBind("test_topic_queue1", "test_topic_exchange", "orange.*");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("消费者1:" + s);channel.basicAck(envelope.getDeliveryTag(), false);}};//监听队列channel.basicConsume("test_topic_queue1", false, consumer);} } -
消费者2
package topic;import com.rabbitmq.client.*; import utils.ConnectionUtil;import java.io.IOException;public class Receiver2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//声明队列channel.queueDeclare("test_topic_queue2", false, false, false, null);//绑定队列channel.queueBind("test_topic_queue2", "test_topic_exchange", "#.fox");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("消费者2:" + s);channel.basicAck(envelope.getDeliveryTag(), false);}};//监听队列channel.basicConsume("test_topic_queue2", false, consumer);} }
-
-
测试:
-
先运行生产者,因为需要先创建交换机,此步的消息忽略。
-
再运行消费者,消费者的队列绑定交换机。
-
最后运行生产者。

-
2.3 消息的持久化
消息丢失:
- 消费者发生异常,丢失消息 --> 解决方案:手动ack
- 服务器发生宕机 --> 解决方案:持久化
基于通配符模式代码修改
-
生产者修改
package persistence.topic;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import utils.ConnectionUtil;public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明路由(路由名,路由类型,持久化)channel.exchangeDeclare("test_topic_exchange2", "topic", true);String msg = "orange_rabbit";channel.basicPublish("test_topic_exchange", "orange.rabbit", null, msg.getBytes() );msg = "beautiful_smart_fox";//第三个参数可以让消息持久化channel.basicPublish("test_topic_exchange", "beautiful.smart.fox#.fox", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes() );channel.close();connection.close();}} -
消费者
-
消费者1
package persistence.topic;import com.rabbitmq.client.*; import utils.ConnectionUtil;import java.io.IOException;public class Receiver1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//声明队列,第二个参数为true:支持持久化channel.queueDeclare("test_topic_queue1", true, false, false, null);//绑定队列channel.queueBind("test_topic_queue1", "test_topic_exchange", "orange.*");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("消费者1:" + s);channel.basicAck(envelope.getDeliveryTag(), false);}};//监听队列channel.basicConsume("test_topic_queue1", false, consumer);} } -
消费者2
package persistence.topic;import com.rabbitmq.client.*; import utils.ConnectionUtil;import java.io.IOException;public class Receiver2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//声明队列,第二个参数为true:支持持久化channel.queueDeclare("test_topic_queue2", false, false, false, null);//绑定队列channel.queueBind("test_topic_queue2", "test_topic_exchange", "#.fox");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("消费者2:" + s);channel.basicAck(envelope.getDeliveryTag(), false);}};//监听队列channel.basicConsume("test_topic_queue2", false, consumer);} }
-
相关文章:
rabbitmq-java基础详解
一、rabbitmq是什么? 1、MQ定义 MQ(Message Queue)消息队列 主要解决:异步处理、应用解耦、流量削峰等问题,是分布式系统的重要组件,从而实现高性能,高可用,可伸缩和最终一致性的架…...
openssl3.2 - 官方demo学习 - smime - smsign.c
文章目录 openssl3.2 - 官方demo学习 - smime - smsign.c概述笔记END openssl3.2 - 官方demo学习 - smime - smsign.c 概述 从证书中得到X509*和私钥指针 用证书和私钥对铭文进行签名, 得到签名后的pkcs7指针 将pkcs7指向的bio_in, 写为MIME格式的签名密文 BIO_reset() 可以…...
Klocwork—符合功能安全要求的自动化静态测试工具
产品概述 Klocwork是Perforce公司产品,主要用于C、C、C#、Java、 python和Kotlin代码的自动化静态分析工作,可以提供编码规则检查、代码质量度量、测试结果管理等功能。Klocwork可以扩展到大多数规模的项目,与大型复杂环境、各种开发工具集成…...
运筹说 第56期 | 整数规划的数学模型割平面法
前几章讨论过的线性规划问题的一个共同特点是:最优解的取值可以是分数或者小数。然而,在许多实际问题中,决策者要求最优解必须是整数,例如公交车的车辆数、员工的人数、机器的台数、产品的件数等。那么,我们能否将得到…...
vue中内置指令v-model的作用和常见使用方法介绍以及在自定义组件上支持
文章目录 一、v-model是什么二、什么是语法糖三、v-model常见的用法1、对于输入框(input):2、对于复选框(checkbox):3、对于选择框(select):4、对于组件(comp…...
大模型推理引擎面试复习大纲
Transformer原理 基本组成、注意力机制含义 transformer有哪些模块,各个模块有什么作用? transformer的模块可以分为以下几类: Encoder模块:transformer的编码器,它由多个相同的encoder层堆叠而成,每个enc…...
网络安全 | 苹果承认 GPU 安全漏洞存在,iPhone 12、M2 MacBook Air 等受影响
1 月 17 日消息,苹果公司确认了近期出现的有关 Apple GPU 存在安全漏洞的报告,并承认 iPhone 12 和 M2 MacBook Air 受影响。 该漏洞可能使攻击者窃取由芯片处理的数据,包括与 ChatGPT 的对话内容等隐私信息。 安全研究人员发现,…...
C++ 数论相关题目(约数)
1、试除法求约数 主要还是可以成对的求约数进行优化,不然会超时。 时间复杂度根号n #include <iostream> #include <vector> #include <algorithm>using namespace std;int n;vector<int> solve(int a) {vector<int> res;for(int i…...
freeswitch on centos dockerfile模式
概述 freeswitch是一款简单好用的VOIP开源软交换平台。 centos7 docker上编译安装fs的流程记录,本文使用dockerfile模式。 环境 docker engine:Version 24.0.6 centos docker:7 freeswitch:v1.6.20 dockerfile 创建空目录…...
Hologres + Flink 流式湖仓建设
Hologres + Flink 流式湖仓建设 1 Flink + Hologres 特性1.2 实时维表 Lookup1.3 高性能实时写入与更新1.4 多流合并1.5 Hologres 作为 Flink 的数据源1.6 元数据自动发现与更新2 传统实时数仓分层方案2.1传统实时数仓分层方案 1:流式 ETL2.2 传统实时数仓分层方案 2:定时调度…...
Linux粘滞位的理解,什么是粘滞位?
文章目录 前言如何理解?粘滞位的操作最后总结一下 前言 粘滞位(Stickybit),或粘着位,是Unix文件系统权限的一个旗标。最常见的用法在目录上设置粘滞位,如此以来,只有目录内文件的所有者或者root…...
Stable Diffusion的结构要被淘汰了吗?详细解读谷歌最新大杀器VideoPoet
Diffusion Models视频生成-博客汇总 前言:视频生成领域长期被Stable Diffusion统治,大部分的方式都是在预训练的图片Stable Diffusion的基础上加入时间层,学习动态信息。虽然有CoDi《【NeurIPS 2023】多模态联合视频生成大模型CoDi》等模型尝试过突破这一结构的局限,但是都…...
深度学习与大数据推动下的自然语言处理革命
引言: 在当今数字化时代,深度学习和大数据技术的迅猛发展为自然语言处理(Natural Language Processing, NLP)领域注入了新的活力。这些技术的进步不仅推动了计算机对人类语言理解与生成的能力,也在搜索引擎、语音助手、…...
产品经理必备之最强管理项目过程工具----禅道
目录 一.禅道的下载安装 二.禅道的使用 2.1 创建用户 2.2 产品经理的角色 2.3 项目经理的角色 研发的角色 2.4 测试主管的角色 研发角色 三.禅道使用的泳道图 一.禅道的下载安装 官网:项目管理软件 开源项目管理软件 免费项目管理软件 IPD管理软件 - 禅…...
美易官方:贝莱德预计美联储将在6月份开始降息,欧洲央行紧随其后
正文: 根据贝莱德的最新预测,美联储将在6月份开始降息,这一消息早于欧洲央行的预期。贝莱德高级投资策略师Laura Cooper表示:“我们更倾向于6月份降息、然后重新校准政策。”预计美联储在年底前将会降息75至100个基点。 与此同时…...
视觉检测系统:工厂生产零部件的智能检测
在工厂的生产加工过程中,工业视觉检测系统被广泛应用,并且起着重要的作用。它能够对不同的零部件进行多功能的视觉检测,包括尺寸和外观的缺陷。随着制造业市场竞争越来越激烈,对产品质检效率的要求不断提高,传统的人工…...
Spring事务的四大特性+事务的传播机制+隔离机制
Spring事务的四大特性 ① 原子性 atomicity 原子性是指事务是一个不可分割的工作单位,事务中的操作要么都发生,要么都不发生。 事务是一个原子操作, 由一系列动作组成。 组成一个事务的多个数据库操作是一个不可分割的原子单元,只有所有的…...
基于arcgis js api 4.x开发点聚合效果
一、代码 <html> <head><meta charset"utf-8" /><meta name"viewport"content"initial-scale1,maximum-scale1,user-scalableno" /><title>Build a custom layer view using deck.gl | Sample | ArcGIS API fo…...
什么是DDOS高防ip?DDOS高防ip是怎么防护攻击的
随着互联网的快速发展,网络安全问题日益突出,DDoS攻击和CC攻击等网络威胁对企业和网站的正常运营造成了巨大的威胁。为了解决这些问题,高防IP作为一种网络安全服务应运而生。高防IP通过实时监测和分析流量,识别和拦截恶意流量&…...
提示词工程: 大语言模型的Embedding(嵌入和Fine-tuning(微调)
本文是针对这篇文章(https://www.promptengineering.org/master-prompt-engineering-llm-embedding-and-fine-tuning/)的中文翻译,用以详细介绍Embedding(语义嵌入)和Fine Tuning(微调)的概念和…...
深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法
深入浅出:JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中,随机数的生成看似简单,却隐藏着许多玄机。无论是生成密码、加密密钥,还是创建安全令牌,随机数的质量直接关系到系统的安全性。Jav…...
解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八
现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet,点击确认后如下提示 最终上报fail 解决方法 内核升级导致,需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...
MMaDA: Multimodal Large Diffusion Language Models
CODE : https://github.com/Gen-Verse/MMaDA Abstract 我们介绍了一种新型的多模态扩散基础模型MMaDA,它被设计用于在文本推理、多模态理解和文本到图像生成等不同领域实现卓越的性能。该方法的特点是三个关键创新:(i) MMaDA采用统一的扩散架构…...
自然语言处理——Transformer
自然语言处理——Transformer 自注意力机制多头注意力机制Transformer 虽然循环神经网络可以对具有序列特性的数据非常有效,它能挖掘数据中的时序信息以及语义信息,但是它有一个很大的缺陷——很难并行化。 我们可以考虑用CNN来替代RNN,但是…...
爬虫基础学习day2
# 爬虫设计领域 工商:企查查、天眼查短视频:抖音、快手、西瓜 ---> 飞瓜电商:京东、淘宝、聚美优品、亚马逊 ---> 分析店铺经营决策标题、排名航空:抓取所有航空公司价格 ---> 去哪儿自媒体:采集自媒体数据进…...
Kafka入门-生产者
生产者 生产者发送流程: 延迟时间为0ms时,也就意味着每当有数据就会直接发送 异步发送API 异步发送和同步发送的不同在于:异步发送不需要等待结果,同步发送必须等待结果才能进行下一步发送。 普通异步发送 首先导入所需的k…...
免费数学几何作图web平台
光锐软件免费数学工具,maths,数学制图,数学作图,几何作图,几何,AR开发,AR教育,增强现实,软件公司,XR,MR,VR,虚拟仿真,虚拟现实,混合现实,教育科技产品,职业模拟培训,高保真VR场景,结构互动课件,元宇宙http://xaglare.c…...
宇树科技,改名了!
提到国内具身智能和机器人领域的代表企业,那宇树科技(Unitree)必须名列其榜。 最近,宇树科技的一项新变动消息在业界引发了不少关注和讨论,即: 宇树向其合作伙伴发布了一封公司名称变更函称,因…...
python爬虫——气象数据爬取
一、导入库与全局配置 python 运行 import json import datetime import time import requests from sqlalchemy import create_engine import csv import pandas as pd作用: 引入数据解析、网络请求、时间处理、数据库操作等所需库。requests:发送 …...
LangFlow技术架构分析
🔧 LangFlow 的可视化技术栈 前端节点编辑器 底层框架:基于 (一个现代化的 React 节点绘图库) 功能: 拖拽式构建 LangGraph 状态机 实时连线定义节点依赖关系 可视化调试循环和分支逻辑 与 LangGraph 的深…...
