15.消息队列RabbitMQ
一、基本概念
RabbitMQ 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
安装 RabbitMQ 需要先安装 Erlang 环境并配置环境变量,安装完后进入 RabbitMQ 的 sbin 目录运行命令激活控制台界面,访问地址 账号密码均为 guest。
rabbitmq-plugins enable rabbitmq_management
二、用户
- 超级管理员(administrator):可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
- 监控者(monitoring):可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
- 策略制定者(policymaker):可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
- 普通管理者(management):仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
- 其他:无法登陆管理控制台,通常就是普通的生产者和消费者。
三、工作模式
RabbitMQ主要有五种工作模式,分别是:
- 简单模式(hello world)
- 工作队列模式(work queue)
- 发布/订阅模式(publish/subscribe)
- 路由模式(routing)
- 主题模式(topic)
导入依赖:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.4.1</version>
</dependency>
工具类:
public class ConnectionUtil {public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("localhost");//端口factory.setPort(5672);//设置账号信息,用户名、密码、vhostfactory.setVirtualHost("vhost");factory.setUsername("guest");factory.setPassword("guest");// 通过工厂获取连接Connection connection = factory.newConnection();return connection;}
}
1.简单模式(hello world):
//发送信息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare("hello", false, false, false, null);// 消息内容String message = "Hello World!";channel.basicPublish("", "hello", null, message.getBytes());//关闭通道和连接channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare("hello", false, false, false, null);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列channel.basicConsume("hello", true, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());}
}
2.工作队列模式(work queue):多个消费者消费同一队列消息。
//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare("hello", false, false, false, null);// 同一时刻服务器只会发一条消息给消费者,否则MQ会将所有请求平均发送给所有消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,false表示手动返回完成状态,true表示接收到消息马上自动确认完成channel.basicConsume("hello", false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());// 返回确认状态,否则表示使用自动确认模式channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
}
3.发布/订阅模式(publish/subscribe):通过交换机发送消息到多个队列。
//发送消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息内容String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());// 返回完成状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
}
4.路由模式(routing):通过交换机进行路由匹配发送消息到不同队列。
//发送消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchange及类型channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息内容String message = "Hello World!";//指定消息路由channel.basicPublish(EXCHANGE_NAME, "routing", null, message.getBytes());channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,并指定多个路由channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing1");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing2");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());// 返回完成状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
}
5.主题模式(topic):通过交换机进行通配符匹配发送消息到不同队列。
//发送消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchange及类型channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 消息内容String message = "Hello World!";//指定消息匹配关键字channel.basicPublish(EXCHANGE_NAME, "topic", null, message.getBytes());channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,并指定多个通配符channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic1.*");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic2.*");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());// 返回完成状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
}
四、Spring整合
Spring 提供了 RabbitTemplate 类执行消息发送。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:rabbitmq:host: 192.168.88.88port: 5672username: guestpassword: guestvirtual-host: /
@Configuration
public class MQConfig {@Beanpublic Exchange exchange1(){return ExchangeBuilder.fanoutExchange("fanout").build();}@Beanpublic Exchange exchange2(){return ExchangeBuilder.directExchange("direct").build();}@Beanpublic Queue queue1(){return QueueBuilder.durable("hello1").build();}@Beanpublic Queue queue2(){return QueueBuilder.durable("hello2").build();}@Beanpublic Binding binding1(Exchange exchange1,Queue queue1){return BindingBuilder.bind(queue1).to(exchange1).with("key1").noargs();}@Beanpublic Binding binding2(Exchange exchange2,Queue queue2){return BindingBuilder.bind(queue2).to(exchange2).with("key2").noargs();}
}
@Component
//定义队列并绑定
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "hello", durable = "true", autoDelete = "true"),exchange = @Exchange(value = "fanout", type = ExchangeTypes.FANOUT), key = "key"), ackMode = "MANUAL")
public class MyListener {@RabbitHandlerpublic void consume(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException {//手动返回状态if () {// RabbitMQ的ack机制中,第二个参数返回true,表示需要将这条消息投递给其他的消费者重新消费channel.basicAck(deliveryTag, false);} else {// 第三个参数true,表示这个消息会重新进入队列channel.basicNack(deliveryTag, false, true);}}
}
相关文章:
15.消息队列RabbitMQ
一、基本概念 RabbitMQ 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息…...

并发编程之死锁问题介绍
一、本文概览 死锁问题在并发编程中是一个非常致命的问题,问题一旦产生,只能通过重启机器、修改代码来修复问题,下面我们通过一小段文章内容介绍下死锁以及如何死锁的预防 二、什么是死锁? 在介绍死锁之前,先来明确下什…...

【python学习笔记】:SQL常用脚本(一)
1、行转列的用法PIVOT CREATE table test (id int,name nvarchar(20),quarter int,number int) insert into test values(1,N苹果,1,1000) insert into test values(1,N苹果,2,2000) insert into test values(1,N苹果,3,4000) insert into test values(1,N苹果,4,5000) insert…...

Spring是怎么解决循环依赖的
1.什么是循环依赖: 这里给大家举个简单的例子,相信看了上一篇文章大家都知道了解了spring的生命周期创建流程。那么在Spring在生命周期的哪一步会出现循环依赖呢? 第一阶段:实例化阶段 Instantiation 第二阶段:属性赋…...

HTML创意动画代码
目录1、动态气泡背景2、创意文字3、旋转立方体1、动态气泡背景 <!DOCTYPE html> <html> <head><title>Bubble Background</title><style>body {margin: 0;padding: 0;height: 100vh;background: #222;display: flex;flex-direction: colum…...
软工第一次个人作业——阅读和提问
软工第一次个人作业——阅读和提问 项目内容这个作业属于哪个课程2023北航敏捷软件工程这个作业的要求在哪里个人作业-阅读和提问我在这个课程的目标是体验敏捷开发过程,掌握一些开发技能,为进一步发展作铺垫这个作业在哪个具体方面帮助我实现目标对本课…...
urho3d的自定义文件格式
Urho3D尽可能使用现有文件格式,仅在绝对必要时才定义自定义文件格式。当前使用的自定义文件格式有: 二进制模型格式(.mdl) Model geometry and vertex morph data byte[4] Identifier "UMDL" or "UMD2" …...

spark第一章:环境安装
系列文章目录 spark第一章:环境安装 文章目录系列文章目录前言一、文件准备1.文件上传2.文件解压3.修改配置4.启动环境二、历史服务器1.修改配置2.启动历史服务器总结前言 spark在大数据环境的重要程度就不必细说了,直接开始吧。 一、文件准备 1.文件…...
MySQL---存储过程与存储函数的相关概念
MySQL—存储过程与存储函数的相关概念 存储函数和存储过程的主要区别: 存储函数一定会有返回值的存储过程不一定有返回值 存储过程和函数能后将复杂的SQL逻辑封装在一起,应用程序无需关注存储过程和函数内部复杂的SQL逻辑,而只需要简单地调…...

PMP值得考吗?
第一,PMP的价值体现 1、PMP是管理岗位必考证书。 多数企业会选择优先录用持PMP证书的管理人才,PMP成为管理岗位的必考证书。PMP在很多外企和国内中大型企业非常受重视,中石油、中海油、华为等等都会给内部员工做培训。 这些机构对项目管理…...
Quartus 报错汇总(持续更新...)
1、Error (10663): Verilog HDL Port Connection error at top_rom.v(70): output or inout port "stcp" must be connected to a structural net expression输出变量stcp在原设计文件中已经定义为reg型,在实例化时不能再定义为reg型,而应该是…...

Netty权威指南总结(一)
一、为什么选择Netty:API使用简单,开发门槛低,屏蔽了NIO通信的底层细节。功能强大,预制了很多种编解码功能,支持主流协议。定制能力强,可以通过ChannelHandler对通信框架进行灵活地拓展。性能高、成熟、稳定…...

Elasticsearch:如何轻松安全地对实时 Elasticsearch 索引重新索引你的数据
在很多的时候,由于一些需求,我们不得不修改索引的映射,也即 mapping,这个时候我们需要重新索引(reindex)来把之前的数据索引到新的索引中。槽糕的是,我们的这个索引还在不断地收集实时数据&…...

【算法笔记】前缀和与差分
第一课前缀和与差分 算法是解决问题的方法与步骤。 在看一个算法是否优秀时,我们一般都要考虑一个算法的时间复杂度和空间复杂度。 现在随着空间越来越大,时间复杂度成为了一个算法的重要指标,那么如何估计一个算法的时间复杂度呢…...
python实战应用讲解-【实战应用篇】函数式编程-八皇后问题(附示例代码)
目录 知识储备-迭代器相关模块 itertools 模块 创建新的迭代器 根据最短输入序列长度停止的迭代器...

【Servlet篇】如何解决Request请求中文乱码的问题?
前言 前面一篇文章我们探讨了 Servlet 中的 Request 对象,Request 请求对象中封装了请求数据,使用相应的 API 就可以获取请求参数。 【Servlet篇】一文带你读懂 Request 对象 也许有小伙伴已经发现了前面的方式获取请求参数时,会出现中文乱…...

SpringBoot:SpringBoot简介与快速入门(1)
SpringBoot快速入门1. SpringBoot简介2. SpringBoot快速入门2.1 创建SpringBoot项目(必须联网,要不然创建失败,在模块3会讲到原因)2.2 编写对应的Controller类2.3 启动测试3. Spring官网构建工程4. SpringBoot工程快速启动4.1 为什…...

RabbitMQ学习(十一):RabbitMQ 集群
一、集群1.1 为什么要使用集群前面我们介绍了如何安装及运行 RabbitMQ 服务,不过这些是单机版的,无法满足目前真实应用的 要求。如果 RabbitMQ 服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?单台 RabbitMQ 服务器可以…...

学渣适用版——Transformer理论和代码以及注意力机制attention的学习
参考一篇玩具级别不错的代码和案例 自注意力机制 注意力机制是为了transform打基础。 参考这个自注意力机制的讲解流程很详细, 但是学渣一般不知道 key,query,value是啥。 结合B站和GPT理解 注意力机制是一种常见的神经网络结构࿰…...

网上这么多IT的培训机构,我们该怎么选?
说实话,千万不要把这个答案放在网上来找,因为你只能得到别人觉得合适的或者机构的广告;当然个人的培训经历可以听一听的,毕竟不靠谱的机构也有,比如让你交一两万去上线上课程或者一百号来人坐一起看视频,这…...
vscode里如何用git
打开vs终端执行如下: 1 初始化 Git 仓库(如果尚未初始化) git init 2 添加文件到 Git 仓库 git add . 3 使用 git commit 命令来提交你的更改。确保在提交时加上一个有用的消息。 git commit -m "备注信息" 4 …...

Python:操作 Excel 折叠
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...
【算法训练营Day07】字符串part1
文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接:344. 反转字符串 双指针法,两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...

(转)什么是DockerCompose?它有什么作用?
一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器。 Compose文件是一个文本文件,通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...
动态 Web 开发技术入门篇
一、HTTP 协议核心 1.1 HTTP 基础 协议全称 :HyperText Transfer Protocol(超文本传输协议) 默认端口 :HTTP 使用 80 端口,HTTPS 使用 443 端口。 请求方法 : GET :用于获取资源,…...

iview框架主题色的应用
1.下载 less要使用3.0.0以下的版本 npm install less2.7.3 npm install less-loader4.0.52./src/config/theme.js文件 module.exports {yellow: {theme-color: #FDCE04},blue: {theme-color: #547CE7} }在sass中使用theme配置的颜色主题,无需引入,直接可…...

Ubuntu Cursor升级成v1.0
0. 当前版本低 使用当前 Cursor v0.50时 GitHub Copilot Chat 打不开,快捷键也不好用,当看到 Cursor 升级后,还是蛮高兴的 1. 下载 Cursor 下载地址:https://www.cursor.com/cn/downloads 点击下载 Linux (x64) ,…...
LOOI机器人的技术实现解析:从手势识别到边缘检测
LOOI机器人作为一款创新的AI硬件产品,通过将智能手机转变为具有情感交互能力的桌面机器人,展示了前沿AI技术与传统硬件设计的完美结合。作为AI与玩具领域的专家,我将全面解析LOOI的技术实现架构,特别是其手势识别、物体识别和环境…...
用鸿蒙HarmonyOS5实现中国象棋小游戏的过程
下面是一个基于鸿蒙OS (HarmonyOS) 的中国象棋小游戏的实现代码。这个实现使用Java语言和鸿蒙的Ability框架。 1. 项目结构 /src/main/java/com/example/chinesechess/├── MainAbilitySlice.java // 主界面逻辑├── ChessView.java // 游戏视图和逻辑├──…...

海云安高敏捷信创白盒SCAP入选《中国网络安全细分领域产品名录》
近日,嘶吼安全产业研究院发布《中国网络安全细分领域产品名录》,海云安高敏捷信创白盒(SCAP)成功入选软件供应链安全领域产品名录。 在数字化转型加速的今天,网络安全已成为企业生存与发展的核心基石,为了解…...