15.消息队列RabbitMQ
一、基本概念
RabbitMQ 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
安装 RabbitMQ 需要先安装 Erlang 环境并配置环境变量,安装完后进入 RabbitMQ 的 sbin 目录运行命令激活控制台界面,访问地址 账号密码均为 guest。
rabbitmq-plugins enable rabbitmq_management
二、用户
- 超级管理员(administrator):可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
- 监控者(monitoring):可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
- 策略制定者(policymaker):可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
- 普通管理者(management):仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
- 其他:无法登陆管理控制台,通常就是普通的生产者和消费者。
三、工作模式
RabbitMQ主要有五种工作模式,分别是:
- 简单模式(hello world)
- 工作队列模式(work queue)
- 发布/订阅模式(publish/subscribe)
- 路由模式(routing)
- 主题模式(topic)
导入依赖:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.4.1</version>
</dependency>
工具类:
public class ConnectionUtil {public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("localhost");//端口factory.setPort(5672);//设置账号信息,用户名、密码、vhostfactory.setVirtualHost("vhost");factory.setUsername("guest");factory.setPassword("guest");// 通过工厂获取连接Connection connection = factory.newConnection();return connection;}
}
1.简单模式(hello world):
//发送信息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare("hello", false, false, false, null);// 消息内容String message = "Hello World!";channel.basicPublish("", "hello", null, message.getBytes());//关闭通道和连接channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare("hello", false, false, false, null);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列channel.basicConsume("hello", true, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());}
}
2.工作队列模式(work queue):多个消费者消费同一队列消息。
//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare("hello", false, false, false, null);// 同一时刻服务器只会发一条消息给消费者,否则MQ会将所有请求平均发送给所有消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,false表示手动返回完成状态,true表示接收到消息马上自动确认完成channel.basicConsume("hello", false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());// 返回确认状态,否则表示使用自动确认模式channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
}
3.发布/订阅模式(publish/subscribe):通过交换机发送消息到多个队列。
//发送消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息内容String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());// 返回完成状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
}
4.路由模式(routing):通过交换机进行路由匹配发送消息到不同队列。
//发送消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchange及类型channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息内容String message = "Hello World!";//指定消息路由channel.basicPublish(EXCHANGE_NAME, "routing", null, message.getBytes());channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,并指定多个路由channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing1");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing2");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());// 返回完成状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
}
5.主题模式(topic):通过交换机进行通配符匹配发送消息到不同队列。
//发送消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchange及类型channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 消息内容String message = "Hello World!";//指定消息匹配关键字channel.basicPublish(EXCHANGE_NAME, "topic", null, message.getBytes());channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,并指定多个通配符channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic1.*");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic2.*");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());// 返回完成状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
}
四、Spring整合
Spring 提供了 RabbitTemplate 类执行消息发送。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:rabbitmq:host: 192.168.88.88port: 5672username: guestpassword: guestvirtual-host: /
@Configuration
public class MQConfig {@Beanpublic Exchange exchange1(){return ExchangeBuilder.fanoutExchange("fanout").build();}@Beanpublic Exchange exchange2(){return ExchangeBuilder.directExchange("direct").build();}@Beanpublic Queue queue1(){return QueueBuilder.durable("hello1").build();}@Beanpublic Queue queue2(){return QueueBuilder.durable("hello2").build();}@Beanpublic Binding binding1(Exchange exchange1,Queue queue1){return BindingBuilder.bind(queue1).to(exchange1).with("key1").noargs();}@Beanpublic Binding binding2(Exchange exchange2,Queue queue2){return BindingBuilder.bind(queue2).to(exchange2).with("key2").noargs();}
}
@Component
//定义队列并绑定
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "hello", durable = "true", autoDelete = "true"),exchange = @Exchange(value = "fanout", type = ExchangeTypes.FANOUT), key = "key"), ackMode = "MANUAL")
public class MyListener {@RabbitHandlerpublic void consume(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException {//手动返回状态if () {// RabbitMQ的ack机制中,第二个参数返回true,表示需要将这条消息投递给其他的消费者重新消费channel.basicAck(deliveryTag, false);} else {// 第三个参数true,表示这个消息会重新进入队列channel.basicNack(deliveryTag, false, true);}}
}
相关文章:
15.消息队列RabbitMQ
一、基本概念 RabbitMQ 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息…...
并发编程之死锁问题介绍
一、本文概览 死锁问题在并发编程中是一个非常致命的问题,问题一旦产生,只能通过重启机器、修改代码来修复问题,下面我们通过一小段文章内容介绍下死锁以及如何死锁的预防 二、什么是死锁? 在介绍死锁之前,先来明确下什…...
【python学习笔记】:SQL常用脚本(一)
1、行转列的用法PIVOT CREATE table test (id int,name nvarchar(20),quarter int,number int) insert into test values(1,N苹果,1,1000) insert into test values(1,N苹果,2,2000) insert into test values(1,N苹果,3,4000) insert into test values(1,N苹果,4,5000) insert…...
Spring是怎么解决循环依赖的
1.什么是循环依赖: 这里给大家举个简单的例子,相信看了上一篇文章大家都知道了解了spring的生命周期创建流程。那么在Spring在生命周期的哪一步会出现循环依赖呢? 第一阶段:实例化阶段 Instantiation 第二阶段:属性赋…...
HTML创意动画代码
目录1、动态气泡背景2、创意文字3、旋转立方体1、动态气泡背景 <!DOCTYPE html> <html> <head><title>Bubble Background</title><style>body {margin: 0;padding: 0;height: 100vh;background: #222;display: flex;flex-direction: colum…...
软工第一次个人作业——阅读和提问
软工第一次个人作业——阅读和提问 项目内容这个作业属于哪个课程2023北航敏捷软件工程这个作业的要求在哪里个人作业-阅读和提问我在这个课程的目标是体验敏捷开发过程,掌握一些开发技能,为进一步发展作铺垫这个作业在哪个具体方面帮助我实现目标对本课…...
urho3d的自定义文件格式
Urho3D尽可能使用现有文件格式,仅在绝对必要时才定义自定义文件格式。当前使用的自定义文件格式有: 二进制模型格式(.mdl) Model geometry and vertex morph data byte[4] Identifier "UMDL" or "UMD2" …...
spark第一章:环境安装
系列文章目录 spark第一章:环境安装 文章目录系列文章目录前言一、文件准备1.文件上传2.文件解压3.修改配置4.启动环境二、历史服务器1.修改配置2.启动历史服务器总结前言 spark在大数据环境的重要程度就不必细说了,直接开始吧。 一、文件准备 1.文件…...
MySQL---存储过程与存储函数的相关概念
MySQL—存储过程与存储函数的相关概念 存储函数和存储过程的主要区别: 存储函数一定会有返回值的存储过程不一定有返回值 存储过程和函数能后将复杂的SQL逻辑封装在一起,应用程序无需关注存储过程和函数内部复杂的SQL逻辑,而只需要简单地调…...
PMP值得考吗?
第一,PMP的价值体现 1、PMP是管理岗位必考证书。 多数企业会选择优先录用持PMP证书的管理人才,PMP成为管理岗位的必考证书。PMP在很多外企和国内中大型企业非常受重视,中石油、中海油、华为等等都会给内部员工做培训。 这些机构对项目管理…...
Quartus 报错汇总(持续更新...)
1、Error (10663): Verilog HDL Port Connection error at top_rom.v(70): output or inout port "stcp" must be connected to a structural net expression输出变量stcp在原设计文件中已经定义为reg型,在实例化时不能再定义为reg型,而应该是…...
Netty权威指南总结(一)
一、为什么选择Netty:API使用简单,开发门槛低,屏蔽了NIO通信的底层细节。功能强大,预制了很多种编解码功能,支持主流协议。定制能力强,可以通过ChannelHandler对通信框架进行灵活地拓展。性能高、成熟、稳定…...
Elasticsearch:如何轻松安全地对实时 Elasticsearch 索引重新索引你的数据
在很多的时候,由于一些需求,我们不得不修改索引的映射,也即 mapping,这个时候我们需要重新索引(reindex)来把之前的数据索引到新的索引中。槽糕的是,我们的这个索引还在不断地收集实时数据&…...
【算法笔记】前缀和与差分
第一课前缀和与差分 算法是解决问题的方法与步骤。 在看一个算法是否优秀时,我们一般都要考虑一个算法的时间复杂度和空间复杂度。 现在随着空间越来越大,时间复杂度成为了一个算法的重要指标,那么如何估计一个算法的时间复杂度呢…...
python实战应用讲解-【实战应用篇】函数式编程-八皇后问题(附示例代码)
目录 知识储备-迭代器相关模块 itertools 模块 创建新的迭代器 根据最短输入序列长度停止的迭代器...
【Servlet篇】如何解决Request请求中文乱码的问题?
前言 前面一篇文章我们探讨了 Servlet 中的 Request 对象,Request 请求对象中封装了请求数据,使用相应的 API 就可以获取请求参数。 【Servlet篇】一文带你读懂 Request 对象 也许有小伙伴已经发现了前面的方式获取请求参数时,会出现中文乱…...
SpringBoot:SpringBoot简介与快速入门(1)
SpringBoot快速入门1. SpringBoot简介2. SpringBoot快速入门2.1 创建SpringBoot项目(必须联网,要不然创建失败,在模块3会讲到原因)2.2 编写对应的Controller类2.3 启动测试3. Spring官网构建工程4. SpringBoot工程快速启动4.1 为什…...
RabbitMQ学习(十一):RabbitMQ 集群
一、集群1.1 为什么要使用集群前面我们介绍了如何安装及运行 RabbitMQ 服务,不过这些是单机版的,无法满足目前真实应用的 要求。如果 RabbitMQ 服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?单台 RabbitMQ 服务器可以…...
学渣适用版——Transformer理论和代码以及注意力机制attention的学习
参考一篇玩具级别不错的代码和案例 自注意力机制 注意力机制是为了transform打基础。 参考这个自注意力机制的讲解流程很详细, 但是学渣一般不知道 key,query,value是啥。 结合B站和GPT理解 注意力机制是一种常见的神经网络结构࿰…...
网上这么多IT的培训机构,我们该怎么选?
说实话,千万不要把这个答案放在网上来找,因为你只能得到别人觉得合适的或者机构的广告;当然个人的培训经历可以听一听的,毕竟不靠谱的机构也有,比如让你交一两万去上线上课程或者一百号来人坐一起看视频,这…...
《用户共鸣指数(E)驱动品牌大模型种草:如何抢占大模型搜索结果情感高地》
在注意力分散、内容高度同质化的时代,情感连接已成为品牌破圈的关键通道。我们在服务大量品牌客户的过程中发现,消费者对内容的“有感”程度,正日益成为影响品牌传播效率与转化率的核心变量。在生成式AI驱动的内容生成与推荐环境中࿰…...
linux arm系统烧录
1、打开瑞芯微程序 2、按住linux arm 的 recover按键 插入电源 3、当瑞芯微检测到有设备 4、松开recover按键 5、选择升级固件 6、点击固件选择本地刷机的linux arm 镜像 7、点击升级 (忘了有没有这步了 估计有) 刷机程序 和 镜像 就不提供了。要刷的时…...
微服务商城-商品微服务
数据表 CREATE TABLE product (id bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 商品id,cateid smallint(6) UNSIGNED NOT NULL DEFAULT 0 COMMENT 类别Id,name varchar(100) NOT NULL DEFAULT COMMENT 商品名称,subtitle varchar(200) NOT NULL DEFAULT COMMENT 商…...
让AI看见世界:MCP协议与服务器的工作原理
让AI看见世界:MCP协议与服务器的工作原理 MCP(Model Context Protocol)是一种创新的通信协议,旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天,MCP正成为连接AI与现实世界的重要桥梁。…...
【JavaSE】绘图与事件入门学习笔记
-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角,以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向,距离坐标原点x个像素;第二个是y坐标,表示当前位置为垂直方向,距离坐标原点y个像素。 坐标体系-像素 …...
【HarmonyOS 5 开发速记】如何获取用户信息(头像/昵称/手机号)
1.获取 authorizationCode: 2.利用 authorizationCode 获取 accessToken:文档中心 3.获取手机:文档中心 4.获取昵称头像:文档中心 首先创建 request 若要获取手机号,scope必填 phone,permissions 必填 …...
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中,新增了一个本地验证码接口 /code,使用函数式路由(RouterFunction)和 Hutool 的 Circle…...
【数据分析】R版IntelliGenes用于生物标志物发现的可解释机器学习
禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍流程步骤1. 输入数据2. 特征选择3. 模型训练4. I-Genes 评分计算5. 输出结果 IntelliGenesR 安装包1. 特征选择2. 模型训练和评估3. I-Genes 评分计…...
MySQL账号权限管理指南:安全创建账户与精细授权技巧
在MySQL数据库管理中,合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号? 最小权限原则…...
使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度
文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...
