rabbitmq入门学习
写在前面
本文看下rabbit mq的基础概念以及使用。
1:简单介绍
为了不同进程间通信的解耦,出现了消息队列,为了规范消息队列的具体实现,Java制定了jms规范,这是一套基于接口的规范,因此是绑定语言的,即只能通过Java语言来实现和使用,与jms类似还有基于net的nms,这也是一套规范接口,只不过是基于.net开发语言的。不管是jms还是nms,它们都有一个通病,就是无法实现跨语言,这个时候amqp就应用而生了,可以将其理解为一种应用层的协议,构建在tcp之上,因此就可以实现跨语言的消息通信,参考下图:
amqp协议通信模型如下:
2:基础环境准备
2.1:服务安装
参考docker安装rabbitmq 。
2.2:创建Virtual host和用户
- 创建virtual host
- 创建admin用户
- 设置admin权限
添加成功
:
3:正戏
本文主要看其提供的5种队列,如下图:
3.1:简单队列
简单队列就是一个生产者一个消费者的队列方式,如下图:
- 生产消息:
public class SimpleSend {private final static String QUEUE_NAME = "q_test_01";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道Channel channel = connection.createChannel();// 声明(创建)队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message = "Hello World come here!!!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");//关闭通道和连接channel.close();connection.close();}
}
生产后:
点击q_test_01
:
- 消费消息
public class SimpleRecv {private final static String QUEUE_NAME = "q_test_01";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列 true 这里使用自动确认,及消息消费默认消费成功,这种方式效率高,但是容易丢失消息// 如果某些场景允许部分消息丢失,但是要求执行效率,则可以考虑将该值设置为true,否则设置为false,即手动确认// 最周通过执行channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);完成手动确认,多一次网络通信channel.basicConsume(QUEUE_NAME, true, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");}}
}
输出:
[INFO] --- exec-maven-plugin:3.1.0:exec (default-cli) @ rabbitmq-study ---[x] Received 'Hello World come here!!!'
3.2:work队列
一个生产者多个消费者,如下图:
work队列看起来和简单队列相比只是多起了几个消费者而已。
- 生产者
public class WorkSend {private final static String QUEUE_NAME = "test_queue_work";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);for (int i = 0; i < 100; i++) {// 消息内容String message = "" + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(i * 10);}channel.close();connection.close();}
}
- 消费者
public class WorkRecv {private final static String QUEUE_NAME = "test_queue_work";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 同一时刻服务器只会发一条消息给消费者// 该值设置为1,配合手动确认,则可以实现一条一条消费,确认一条后消费下一条,且多个消费者时谁的消费能力强,谁消费的消息多// 消费者之间消费消息不相互影响channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,false表示手动返回完成状态,true表示自动channel.basicConsume(QUEUE_NAME, false, consumer);
// channel.basicConsume(QUEUE_NAME, true, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [y] Received '" + message + "'");//休眠Thread.sleep(10);// 返回确认状态,注释掉表示使用自动确认模式channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}
3.3:订阅模式
这种这种模式可以实现一个消息同时被多个消费者消费(广播)
,但是具体的实现需要依赖于交换器exchange,生产者端将消息发送到交换器,之后消费者端只需要将某个消息队列绑定到交换器,交换器会将消息发送到绑定的所有队列,消费者端就可以从队列中获取到对应的消息,但需要注意一个队列的消息还是只可以获取一次,如下图们:
- 生产者端
public class SubscribeSend {private final static String EXCHANGE_NAME = "test_exchange_fanout111";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息内容String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}
}
消费者端2个
:
public class SubscribeRecv {private final static String QUEUE_NAME = "test_queue_work1";private final static String EXCHANGE_NAME = "test_exchange_fanout111";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [Recv] Received '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}public class SubscribeRecv2 {private final static String QUEUE_NAME = "test_queue_work2";private final static String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [Recv2] Received '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}
-
启动2个消费者
-
启动生产者
[INFO] --- exec-maven-plugin:3.1.0:exec (default-cli) @ rabbitmq-study ---[Recv2] Received 'Hello World!'[INFO] --- exec-maven-plugin:3.1.0:exec (default-cli) @ rabbitmq-study ---[Recv] Received 'Hello World!'
接着看下转换器和队列的绑定关系:
注意以下的问题:
1:因为交换器本身不具备数据存储的能力,所以如果是某个交换器上没有绑定任何的队列,则该消息将会丢失。
2:因为交换器本身不存储数据,所以在具体的消息队列绑定到交换器(即消费者启动前),不要生产消息到交换器,否则消息将会丢失。
3.4:路由模式
这种方式类似于订阅模式,也需要转换器作为中间商
,但是并不会直接无脑
的发送消息,而是会根据消费者额外指定的路由key,生产者在向转换器发送消息时会带着routeKey,消费者在消费消息时会指定自己期望的routeKey只有二者匹配时,才会从队列中消费对应的消息,l另外注意的时交换机类型设置为direct
,如下图:
- 生产者
public class RoutingSend {private final static String EXCHANGE_NAME = "test_exchange_direct123";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchange 设置交换器为direct,即路由模式channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息内容String message = "Hello World!";// 设置路由key为"update",即只有对应队列上设置了update路由key的消费者才会消费到消息channel.basicPublish(EXCHANGE_NAME, "update", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}
}
- 2个消费者
public class RoutingRecv {private final static String QUEUE_NAME = "test_queue_work_route";private final static String EXCHANGE_NAME = "test_exchange_direct123";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机 设置路由key为delete,updatechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}public class RoutingRecv2 {private final static String QUEUE_NAME = "test_queue_work_route2";private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}
先启动消费者再启动生产者测试可。
写在后面
参考文章列表
RabbitMQ使用教程(超详细)
docker安装rabbitmq
RabbitMq的一些概念,JMS、AMQP、MQ 。
相关文章:

rabbitmq入门学习
写在前面 本文看下rabbit mq的基础概念以及使用。 1:简单介绍 为了不同进程间通信的解耦,出现了消息队列,为了规范消息队列的具体实现,Java制定了jms规范,这是一套基于接口的规范,因此是绑定语言的&…...

说说对Fiber架构的理解?解决了什么问题?
一、问题 JavaScript引擎和页面渲染引擎两个线程是互斥的,当其中一个线程执行时,另一个线程只能挂起等待 如果 JavaScript 线程长时间地占用了主线程,那么渲染层面的更新就不得不长时间地等待,界面长时间不更新,会导…...

Spring Security笔记
Spring Security 是 Spring家族中的一个安全管理框架。 一般来说中大型的项目都是使用 SpringSecurity 来做安全框架,小项目用相对简单的Shiro。认证、授权是 SpringSecurity 作为安全框架的核心功能。 认证:通过用户名密码验证当前访问系统的是不是本…...

快速教程|如何在 AWS EC2上使用 Walrus 部署 GitLab
Walrus 是一款基于平台工程理念的开源应用管理平台,致力于解决应用交付领域的深切痛点。借助 Walrus 将云原生的能力和最佳实践扩展到非容器化环境,并支持任意应用形态统一编排部署,降低使用基础设施的复杂度,为研发和运维团队提供…...

[vmware]vmware虚拟机压缩空间清理空间
vmware中的ubuntu使用如果拷贝文件进去在删除,vmare镜像文件并不会减少日积月累会不断是的真实物理磁盘空间大幅度减少,比如我以前windows操作系统本来只有30GB最后居然占道硬盘200GB,清理方法有2种。 第一种:vmware界面操作 第二…...

一篇文章带你使用(MMKV--基于 mmap 的高性能通用 key-value 组件)
一、MMKV是什么? MMKV 是基于 mmap 内存映射的 key-value 组件,底层序列化/反序列化使用 protobuf 实现,性能高,稳定性强。也是腾讯微信团队使用的技术。 支持的数据类型 支持以下 Java 语言基础类型: boolean、int…...

Pytorch 里面torch.no_grad 和model.eval(), model.train() 的作用
torch.no_grad: 影响模型的自微分器,使得其停止工作;这样的话,数据计算的数据就会变快,内存占用也会变小,因为没有了反向梯度计算,当然,我哦们也无法做反向传播。 model.eval() 和model.train()…...

Ozon产品内容评级功能上线,妙手ERP实力助力Ozon卖家全方位打造爆款产品!
产品内容评级,可以直接反映产品质量的高低,也是影响产品排名的关键。具有较高内容评级的产品,将有更大机会显示在搜索结果和类目的前几页中,从而引起买家的关注,促进销售。 为帮助卖家打造高质量产品,妙手…...

Linux 下最主流的文件系统格式——ext
硬盘分成相同大小的单元,我们称为块(Block)。一块的大小是扇区大小的整数倍,默认是 4K。在格式化的时候,这个值是可以设定的。 一大块硬盘被分成了一个个小的块,用来存放文件的数据部分。这样一来…...

变量环境、变量提升和暂时性死区
JavaScript中的提升 在JavaScript中,“Hoisting”(提升)是一种特性,它将变量和函数的声明移动到作用域的顶部。这意味着可以在声明之前使用这些变量和函数,而不会报错。 当JavaScript代码执行时,会经过两个…...

yolov8+多算法多目标追踪+实例分割+目标检测+姿态估计(代码+教程)
多目标追踪实例分割目标检测 YOLO (You Only Look Once) 是一个流行的目标检测算法,它能够在图像中准确地定位和识别多个物体。 本项目是基于 YOLO 算法的目标跟踪系统,它将 YOLO 的目标检测功能与目标跟踪技术相结合,实现了实时的多目标跟…...

【神经网络】【GoogleNet】
1、引言 卷积神经网络是当前最热门的技术,我想深入地学习这门技术,从他的发展历史开始,了解神经网络算法的兴衰起伏;同时了解他在发展过程中的**里程碑式算法**,能更好的把握神经网络发展的未来趋势,了解神…...

网络安全深入学习第八课——正向代理(工具:ReGeorg)
文章目录 一、环境配置二、开始模拟1、拿下跳板机的Webshell权限,并上传shell文件1.1、查看跳板机网络环境1.2、查看arp表 2、使用ReGeorg来建立连接2.1、生产ReGeorg隧道文件2.2、上传ReGeorg隧道的PHP脚本到跳板机2.3、连接隧道2.4、尝试浏览器连接 3、使用Proxif…...

Jmeter全流程性能测试实战
项目背景: 我们的平台为全国某行业监控平台,经过3轮功能测试、接口测试后,98%的问题已经关闭,决定对省平台向全国平台上传数据的接口进行性能测试。 01、测试步骤 1、编写性能测试方案 由于我是刚进入此项目组不久,…...

Python算法例8 将整数A转换为B
1. 问题描述 给定整数A和B,求出将整数A转换为B,需要改变bit的位数。 2. 问题示例 把31转换为14,需要改变2个bit位,即:(31)10(11111)2,(14&…...

一个基于百度飞桨封装的.NET版本OCR工具类库 - PaddleOCRSharp
前言 大家有使用过.NET开发过OCR工具吗?今天给大家推荐一个基于百度飞桨封装的.NET版本OCR工具类库:PaddleOCRSharp。 OCR工具有什么用? OCR(Optical Character Recognition)工具可以将图像或扫描文件中的文本内容转…...

在 CelebA 数据集上训练的 PyTorch 中的基本变分自动编码器
摩西西珀博士 一、说明 我最近发现自己需要一种方法将图像编码到潜在嵌入中,调整嵌入,然后生成新图像。有一些强大的方法可以创建嵌入或从嵌入生成。如果你想同时做到这两点,一种自然且相当简单的方法是使用变分自动编码器。 这样的深度网络不…...

利用Ansible实现批量Linux服务器安全配置
1.摘要 在上一篇<<初步利用Ansible实现批量服务器自动化管理>>文章中, 我初步实现了通过编写清单和剧本来实现多台服务器的自动化管理,在本章节中, 我将利用Ansible的剧本来实现更实用、更复杂一点的功能, 主要功能包括三个:1.同时在三台服务器中增加IP访问控制,只…...

读书笔记:彼得·德鲁克《认识管理》第8章 战略规划:企业家技能
一、章节内容概述 战略规划帮助做好当前的业务以迎接未来。战略规划需要思考业务应该是什么,当前必须做什么才能赢得未来。战略规划需要进行风险决策,需要有组织地抛弃过去的业务,要求清晰界定和明确安排为实现理想的未来而开展的工作。战略…...

HarmonyOS应用开发-视频播放器与弹窗
Viedo组件 在手机、平板或是智慧屏这些终端设备上,媒体功能可以算作是我们最常用的场景之一。无论是实现音频的播放、录制、采集,还是视频的播放、切换、循环,亦或是相机的预览、拍照等功能,媒体组件都是必不可少的。以视频功能为…...

java中对象的引用是什么?
引用和指向 例如: new Student(); 代表创建了一个Student对象,但是也仅仅是创建了一个对象,没有办法访问它。 为了访问这个对象,会使用引用来代表这个对象 Student s new Student(); s这个变量是Student类型,又叫做引…...

jenkins插件迁移
将Jenkins插件迁移至不同的Jenkins实例或更新插件版本是一项常见的任务。以下是迁移Jenkins插件的一般步骤: 备份现有插件: 在开始迁移之前,首先备份你当前的Jenkins实例以及所有相关的插件。这可以通过复制Jenkins的JENKINS_HOME目录来实现…...

RK356X Android13.0 HDMI和喇叭同时出声音
补丁适用范围:RK356X Android13.0 Android默认音频输出逻辑,不接HDMI默认喇叭音频输出,若检测到HDMI接入后,关闭喇叭输出,开启HDMI音频输出,但是BOX产品的使用场景需要插入HDMI后,喇叭仍然输出,可加入此补丁 $ vim frameworks/base/services/core/java/com/android/s…...

vue sass-loader,webpack安装卸载操作命令
检查 node-sass 的可用版本:运行下面的命令,查看 node-sass 的可用版本列表。 查看 npm view node-sass versions卸载 npm uninstall node-sass安装指定版本 npm install node-sass4.14.1安装最新版本 npm install sass-loaderlatest如果没有指定特定…...

nacos应用——占用内存过多问题解决(JVM调优初步)
问题描述 最近搞了一台1年的阿里云服务器,安装了一下常用的MySQL,Redis,rabbitmq,minio,然后有安装了一下nacos,结果一启动nacos内存占用就很高,就比较限制我继续安装其他镜像或者启动别的服务…...

大漠插件(二、Qt使用插件时注意事项)
本章目的 在上篇已经注册完毕大漠,那么怎么使用大漠来制作脚本,我选择了我最熟悉的Qt来开发,毕竟只是小软件,用脚本或者c都差不了多少。本章就是开发途中的一些坑。 本人开发环境是 win11 64、Qt 5.15.2安装了5.10.0的msvc2015 32…...

CSS 浮动
目标target✓ 能够说出来为什么需要浮动能够说出来浮动的排列特性能够说出来三种最常见的布局方式能够说出来为什么需要清除浮动,能够至少写出两种清楚浮动的方法能够利用Photoshop实现基本的切图能够利用Photoshop插件实现切图能够完成学成在线的页面布 传统网页布局的三种模…...

基于STM32+华为云IOT设计的火灾感知系统
一、设计需求 【1】 项目背景 随着城市化进程的加快和人们生活水平的提高,火灾事故频繁发生,给人们的生命财产安全带来巨大威胁。因此,开发一种可靠的火灾感知系统对于预防和减少火灾事故具有重要意义。近年来,随着物联网技术的发展,基于物联网的火灾感知系统逐渐成为研…...

算法通关村第八关|白银|二叉树的深度和高度问题【持续更新】
1.最大深度问题(后序遍历) 只需要一直递归,维护一个最大值。每一层只要有一个子节点,这个最大值就可以增加。 public int maxDepth(TreeNode root) {if (root null) {return 0;}int leftHeight maxDepth(root.left);int right…...

cmake 之add_definitions使用误区
需求 需要实现,在cmake中定义宏定义,可以:1) 在code中可以使用;2) 在cmake中可以识别是否已定义 问题 宏定义,cmake有add_definitions函数,直观的实现方法如下。 cmake_minimum…...