当前位置: 首页 > news >正文

RabbitMQ 消息队列安装及入门

市面常见消息队列中间件对比

技术名称吞吐量 /IO/并发时效性(类似延迟)消息到达时间可用性可靠性优势应用场景
activemq万级简单易学中小型企业、项目
rabbitmq万级极高(微秒)极高生态好(基本什么语言都支持)、时效性高、易学适合绝大数的分布式应用
kafka10万 QBS高(毫秒)极高极高吞吐量大、可靠性、可用性、强大的数据流处理能力适合大规模处理数据的场景、比如构建日志手机系统、实时数据传输、事件流收集传输
rocketmq10万 QBS高ms极高极高吞吐量大、可靠性、可用性、可扩展性适用于金融等可靠性要求较高的场景、适合大规模的消息处理。金融、电商、大规模社交
pulsar10万 QBS高ms极高可靠性、可用性很高、新兴(技术架构先进)适合大规模、高并发的分布式系统(云原生)适合实时分析、事件流处理、物联网数据处理。

RabbitMQ 

RabbitMQ 是基于 AMQP 高级消息队列协议的。

 实际使用可根据官方文档的 demo 。

官方文档:RabbitMQ Tutorials | RabbitMQ

模型

生产者:通俗就是发消息的人,比如在外卖软件上点餐的人

消费者:通俗就是处理消息的任务,比如外卖软件上的商家,需要根据顾客的要求制作餐

交换机:负责把消息转发到对应的队列中,比如有外卖的时候,系统给附近的外面小哥派单

队列:存放消息的地方,等待消费者消费,比如商家肯定不是只做一份餐,做好的餐放在一个指定的位置等待外卖小哥来取餐

路由:转发,就是怎么把消息从一个地方转到另一个地方,通常加在交换机和队列之间,比如系统指定某个范围的外卖小哥接这单

安装

1. 首先安装 RabbitMQ,直接官网下载即可,如果下载速度慢,可以换个网络,或者找找有没有国内镜像。(当初我下载的时候找了半天的镜像都是版本比较老的,结果想着挂一晚上下载,结果官网突然就快了,白折腾了。)

官方网站:Installing on Windows | RabbitMQ

国内镜像:Index of rabbitmq-server-local/v3.12.7

一路 next ,傻瓜式安装即可

安装之后检查服务中是否已经运行了。

2. 安装监控面板

在 RabbitMQ 安装目录下的 sbin 目录下的CMD 输入下面的命令

rabbitmq-plugins.bat enable rabbitmq_management

 安装成功:

默认端口号是 5672,webUI 是 15672

在浏览器输入地址打开管理界面:http://localhost:15672

默认账号密码是 guest

注意:1. 安装目录不能是中文,不能有空格等非法字符,否则页面打不开

           2. 如果想要在远程服务器访问 RabbitMQ 管理面板,需要创建管理员账号,比如在宝塔面板使用时宝塔面板提供的 admin账号,地址就是宝塔面板的 IP 

创建账号:access-control | RabbitMQ

入门

依赖引入

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

单消费者和生产者

一对一的关系

1. 生产者代码

public class SingleProducer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {//创建连接ConnectionFactory factory = new ConnectionFactory();//设置了本地连接,如果修改了用户名和密码,需要设置/*factory.setPassword();factory.setUsername();*/factory.setHost("localhost");//建立连接、创建频道//频道,类似客户端,用于调用serverConnection connection = factory.newConnection();Channel channel = connection.createChannel();//创建队列,队列持久化,第二份参数设置为 truechannel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";//发送消息channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));System.out.println(" [*] Waiting for messages. To exit press CTRL+C");}
}

channel 频道:理解为操作消息队列的 Client,通过 channel 收发消息,提供了和消息对了 server 建立通信的传输方法

channel.queueDeclare 方法参数:

queue:这是一个字符串参数,代表要声明的队列的名称。如果队列不存在,则会自动创建一个新的队列。

durable:这是一个布尔值参数,表示队列是否持久化。如果设置为true,则队列会在服务器重启后仍然存在;如果设置为false,则队列在服务器重启后会被删除。默认值为false。

exclusive:这也是一个布尔值参数,表示队列是否为独占模式。如果设置为true,则只有当前连接可以访问该队列;如果设置为false,则其他连接也可以访问该队列。默认值为false。

autoDelete:这是另一个布尔值参数,表示队列是否自动删除。如果设置为true,则当最后一个消费者取消订阅时,队列将被删除;如果设置为false,则队列将一直存在,直到手动删除或服务器重启。默认值为false。

arguments:这是一个可选参数,用于设置队列的其他属性,比如消息的最大长度、最大优先级等。

channel.basicPublish 参数:

exchange:这是一个字符串参数,代表交换机的名称。如果不需要使用特定的交换机,可以传递一个空字符串("")。交换机是RabbitMQ中用于接收生产者发送的消息并根据绑定规则路由到队列的组件。

routingKey:这也是一个字符串参数,它指定了发布消息的队列。无论通道绑定到哪个队列,最终发布的消息都会包含这个指定的路由键。路由键是用来确定消息应该发送到哪个队列的重要信息。

message:这是要发布的消息本身,通常是字节数组的形式。

properties:这是一个可选参数,用于设置消息的属性,比如消息的优先级、过期时间等。

在使用channel.basicPublish时,需要注意以下几点:

exchange和routingKey不能为空:在AMQImpl类中的实现要求这两个参数都不能为null,否则会抛出异常。

交换机类型:根据不同的需求,可以选择不同类型的交换机,如fanout、direct或topic。每种类型的交换机都有其特定的路由规则。

非命名队列:在某些情况下,比如日志系统,可以使用非命名队列,这样消费者可以接收到所有相关的日志消息,而不是特定的部分。

2. 消费者代码

public class SingleConsumer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明队列,同一个消息队列参数必须一致channel.queueDeclare(QUEUE_NAME, false, false, false, null);//定义了如何处理消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};//接收、消费消息channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });System.out.println(" [*] Waiting for messages. To exit press CTRL+C");}
}

 channel.basicConsume 参数:

  1. queue:这是一个字符串参数,代表要消费的队列的名称。如果队列不存在,则会抛出异常。
  2. onMessage:这是一个回调函数,当有新的消息到达时会被调用。该函数需要接收两个参数:一个表示消息内容的Delivery对象和一个表示通道的Channel对象。
  3. consumerTag:这是一个可选参数,用于标识消费者。如果没有指定,则会自动生成一个唯一的标识符。
  4. autoAck:这是一个布尔值参数,表示是否自动确认消息。如果设置为true,则在消息被处理后会自动发送确认信息;如果设置为false,则需要手动发送确认信息。默认值为false。
  5. arguments:这是一个可选参数,用于设置消费者的其他属性,比如消息的最大长度、最大优先级等。

在使用channel.basicConsume时,需要注意以下几点:

  1. 队列名称:队列名称应该是唯一的,否则会抛出异常。
  2. 消息处理:在onMessage回调函数中,需要对消息进行处理,并根据需要发送确认信息。
  3. 消费者标识符:可以通过设置consumerTag来标识消费者,以便在后续操作中进行识别和管理。
  4. 消费者属性:可以通过设置消费者的其他属性来控制消费者的行为,比如设置消息的最大长度、最大优先级等。

多消费者

多个消费者,比如一个工厂生产商品,一个商店卖不完,分给多个商店一起卖

生产者代码和上面一样


public class MultiProducer {private static final String TASK_QUEUE_NAME = "multi_queue";public static void main(String[] argv) throws Exception {//创建连接ConnectionFactory factory = new ConnectionFactory();//设置本地连接factory.setHost("localhost");//创建队列,创建频道,类似客户端try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {//队列持久化channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);//设置消息Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){//输入消息String message = scanner.nextLine();//发送消息channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}
}

控制处理任务的积压数,最多同时处理任务数

channel.basicQos(1); //最多处理1个

消息确认机制

ack 确认、nack 消息失败、reject 拒绝

当消息拿走之后会有一个确认机制,保证消息成功被消费。当消费者接收消息会给一个反馈,确认消息的状态,成功消息才会被移除。

支持配置 autoack ,建议修改为 false,根据实际情况手动确认。

//手动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
//手动拒绝
channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);

 消费者代码

public class MultiConsumer {private static final String TASK_QUEUE_NAME = "multi_queue";public static void main(String[] argv) throws Exception {//创建连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");for (int i = 0; i < 2; i++) {final Connection connection = factory.newConnection();final Channel channel = connection.createChannel();//队列持久化,参数要一致channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//控制处理任务的积压数,最多同时处理任务数channel.basicQos(1);//定义了如何处理消息int finalI = i;DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");try {//处理工作的逻辑System.out.println(" [x] Received '" +"消费者:" + finalI + " 消息:"+ message + "'");//睡一定时间,模拟机器处理能力有限Thread.sleep(20000);channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {System.out.println(" [x] Done");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};//接收消息,消费消息,开启消息监听channel.basicConsume(TASK_QUEUE_NAME, false , deliverCallback, consumerTag -> {});}}
}

相关文章:

RabbitMQ 消息队列安装及入门

市面常见消息队列中间件对比 技术名称吞吐量 /IO/并发时效性&#xff08;类似延迟&#xff09;消息到达时间可用性可靠性优势应用场景activemq万级高高高简单易学中小型企业、项目rabbitmq万级极高&#xff08;微秒&#xff09;高极高生态好&#xff08;基本什么语言都支持&am…...

K8S认证|CKA题库+答案| 14. 排查故障节点

14、排查集群中的故障节点 CKA v1.29.0模拟系统免费下载试用&#xff1a; 百度网盘&#xff1a;https://pan.baidu.com/s/1vVR_AK6MVK2Jrz0n0R2GoQ?pwdwbki 题目&#xff1a; 您必须在以下Cluster/Node上完成此考题&#xff1a; Cluster …...

Linux:网络管理命令之ss

一、ss命令介绍 Linux下的ss命令是Socket Statistics的缩写&#xff0c;也被称为IPC&#xff08;Inter-Process Communication&#xff09;套接字统计。这是一个强大的网络管理命令&#xff0c;主要用于获取系统中socket的统计信息&#xff0c;可以帮助系统管理员诊断和排查网络…...

数据结构-队列(带图详解)

目录 队列的概念 画图理解队列 代码图理解 代码展示(注意这个队列是单链表的结构实现) Queue.h(队列结构) Queue.c(函数/API实现) main.c(测试文件) 队列的概念 队列&#xff08;Queue&#xff09;是一种基础的数据结构&#xff0c;它遵循先进先出&#xff08;First In …...

python文件名通常以什么结尾

python文件后缀一般有两个&#xff0c;分别是.py和.pyw。视窗用 python.exe 运行 .py&#xff0c;用 pythonw.exe 运行 .pyw 。 这纯粹是因为安装视窗版Python时&#xff0c;扩展名 .py 自动被登记为用 python.exe 运行的文件&#xff0c;而 .pyw 则被登记为用 pythonw.exe 运…...

前端javascript 中 JSON.parse() 的作用

1.解析 JSON 字符串 JSON.parse({"name": "tom"}) // {"name": "tom"} JSON.parse([1,2,3]) // [1,2,3] 2.转换成数字 JSON.parse(12) // 12 3.转换成布尔值 JSON.parse(false) // false...

【Linux学习】进程基础API

下面是有关进程基础API的相关介绍&#xff0c;希望对你有所帮助&#xff01; 小海编程心语录-CSDN博客 目录 1. 僵尸进程与孤儿进程 1.1 孤儿进程 1.2 僵尸进程 2. 监视子进程 2.1 wait() 2.2 waitpid() 3. 执行新程序 exec族函数 4. 守护进程 1. 僵尸进程与孤儿进程…...

音视频及H264/H256编码相关原理

一、音视频封装格式原理&#xff1a; 我们播放的视频文件一般都是用一种封装格式封装起来的&#xff0c;封装格式的作用是什么呢&#xff1f;一般视频文件里不光有视频&#xff0c;还有音频&#xff0c;封装格式的作用就是把视频和音频打包起来。 所以我们先要解封装格式&#…...

查看cpu进程数

import multiprocessing from multiprocessing import Pool# 导入 Pool 允许你创建一个进程池 # 进程池是一组工作进程&#xff0c;它们可以并行地执行多个任务# multiprocessing.cpu_count(): 返回当前机器上的CPU核心数 sum_cpu multiprocessing.cpu_count()use_cpu max(1,…...

MySQL优化篇

文章目录 库表结构优化1.规范和反规范化2.数据类型选择3.主键类型选择 索引优化聚簇索引和辅助索引&#xff08;一切的起源&#xff09;复合索引 查询优化 库表结构优化 1.规范和反规范化 表设计之间性能和数据完整性&#xff0c;耦合和解耦合之间的取舍。 进而考虑是要冗余…...

Python3 笔记:部分专有名词解释

1、python 英 /ˈpaɪθən/ 这个词在英文中的意思是蟒蛇。但据说Python的创始人Guido van Rossum&#xff08;吉多范罗苏姆&#xff09;选择Python这个名字的原因与蟒蛇毫无关系&#xff0c;只是因为他是“蒙提派森飞行马戏团&#xff08;Monty Python&#xff07;s Flying Ci…...

javaAPI文档中文版(JDK11在线版)java帮助文档,掌握文档java学习事半功倍。

&#x1f320;个人主页 : 赶路人- - &#x1f30c;个人格言 : 要努力成为梧桐&#xff0c;让喜鹊在这里栖息。 要努力成为大海&#xff0c;让百川在这里聚积。 11.by,prep.凭&#xff0c;靠&#xff0c;沿 [baɪ] 12.press,v.按&#xff0c;压 [prɛs] 菜鸟教程javaAPI文档中文…...

移动端适配:vw适配方案

vw (Viewport Width) 是一种长度单位&#xff0c;代表视口宽度的百分比。1vw 等于视口宽度的1%。在网页设计和前端开发中&#xff0c;vw 单位常用于实现响应式设计和屏幕适配&#xff0c;尤其是针对不同尺寸和分辨率的移动设备。 为什么使用vw适配&#xff1f; 响应式: 使用v…...

实战Java虚拟机-实战篇

一、内存调优 1.内存溢出和内存泄漏 内存泄漏&#xff08;memory leak&#xff09;&#xff1a;在Java中如果不再使用一个对象&#xff0c;但是该对象依然在GC ROOT的引用链上&#xff0c;这个对象就不会被垃圾回收器回收&#xff0c;这种情况就称之为内存泄漏。内存泄漏绝大…...

力扣:349. 两个数组的交集

349. 两个数组的交集 给定两个数组 nums1 和 nums2 &#xff0c;返回 它们的 交集 。输出结果中的每个元素一定是 唯一 的。我们可以 不考虑输出结果的顺序 。 示例 1&#xff1a; 输入&#xff1a;nums1 [1,2,2,1], nums2 [2,2] 输出&#xff1a;[2]示例 2&#xff1a; …...

深度学习之基于Matlab的BP神经网络交通标志识别

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 一、项目背景与意义 随着智能交通系统&#xff08;ITS&#xff09;的快速发展&#xff0c;交通标志识别&#xff0…...

Linux备份服务及rsync企业备份架构(应用场景)

备份服务概述 备份服务:需要使用到脚本,打包备份,定时任务. 备份服务:rsyncd服务,不同主机之间数据传输. 特点&#xff1a; rsync是个服务也是命令使用方便&#xff0c;具有多种模式传输数据的时候是增量传输 增量与全量&#xff1a; 全量 &#xff1a;无论多少数据全部推…...

用手机打印需要下载什么软件

在快节奏的现代生活中&#xff0c;打印需求无处不在&#xff0c;无论是工作文件、学习资料还是生活小贴士&#xff0c;都可能需要一纸呈现。然而&#xff0c;传统的打印方式往往受限于时间和地点&#xff0c;让人倍感不便。今天&#xff0c;就为大家推荐一款便捷又省钱的手机打…...

Storm在Java中的应用

Storm在Java中的应用主要体现在构建分布式实时计算系统&#xff0c;用于处理大数据流。以下是一些Storm在Java中的具体应用场景和步骤&#xff1a; 实时数据处理&#xff1a;Storm可以实时地接收、处理和传输数据。对于需要快速响应的应用场景&#xff0c;如在线广告、金融交易…...

Java 面试题日常练习

### 基础知识 1. **什么是 JVM&#xff1f;解释其架构。** - JVM&#xff08;Java Virtual Machine&#xff09;是 Java 程序的运行时环境。其架构包括类加载器子系统、运行时数据区&#xff08;堆、栈、本地方法栈、PC 寄存器、方法区&#xff09;、执行引擎和本地方法接口…...

卷爆短剧出海:五大关键,由AIGC重构

短剧高温下&#xff0c;谈谈AIGC的助攻路线。 短剧&#xff0c;一个席卷全球的高温赛道。 以往只是踏着霸总题材&#xff0c;如今&#xff0c;内容循着精品化、IP化的自然发展风向&#xff0c;给内容、制作、平台等产业全链都带来新机&#xff0c;也让短剧消费走向文化深处&am…...

LLM实战:当网页爬虫集成gpt3.5

1. 背景 最近本qiang~关注了一个开源项目Scrapegraph-ai&#xff0c;是关于网页爬虫结合LLM的项目&#xff0c;所以想一探究竟&#xff0c;毕竟当下及未来&#xff0c;LLM终将替代以往的方方面面。 这篇文章主要介绍下该项目&#xff0c;并基于此项目实现一个demo页面&#x…...

Flutter底部导航栏和顶部Tab切换完整代码

题记 —— 执剑天涯&#xff0c;从你的点滴积累开始&#xff0c;所及之处&#xff0c;必精益求精&#xff0c;即是折腾每一天。 目前市场上绝大部分App的布局结构基本统一&#xff1a;底部导航顶部导航&#xff0c;底部导航页里嵌套顶部导航栏&#xff0c;顶部导航页里嵌套图文…...

Jupyter 使用手册: 探索交互式计算的无限可能

什么是 Jupyter? Jupyter 是一个开源的 Web 应用程序,可用于创建和共享包含实时代码、可视化和叙述性文本的文档。它最初是作为 IPython 项目的一部分开发的,后来发展成为支持多种编程语言的交互式计算环境。 应用场景 作为一个开源的交互式计算环境,Jupyter 在以下几个领域…...

IP地址显示“不安全”怎么办|已解决

解决IP地址显示“不安全”的问题&#xff0c;通常需要确保网站或服务使用HTTPS协议进行加密通信&#xff0c;可以通过部署SSL证书来解决&#xff0c;以下是具体的解决步骤&#xff1a; 1 申请IP地址SSL证书&#xff1a;网站管理员应向证书颁发机构&#xff08;CA&#xff09;申…...

国内安全实用的图纸透明加密软件厂家,靠谱的透明加密软件供应商--安秉信息

设计类图纸安全已经成为企业需要注意的问题&#xff0c;在当前互联网设计行业、汽车制造设计、机械制造行业等相关企业都需要对企业内部图纸的保护需求&#xff0c;现在在互联网中&#xff0c;企业数据泄露的事情已经层出不穷&#xff0c;企业对核心图纸的数据安全工作需要重点…...

【kubernetes】探索k8s集群中kubectl的陈述式资源管理

目录 一、k8s集群资源管理方式分类 1.1陈述式资源管理方式&#xff1a;增删查比较方便&#xff0c;但是改非常不方便 1.2声明式资源管理方式&#xff1a;yaml文件管理 二、陈述式资源管理方法 2.1查看版本信息 2.2查看资源对象简写 2.3配置kubectl自动补全 2.4node节点…...

VUE 创建组件常见的几种方式

在 Vue.js 中&#xff0c;组件的创建和使用通常遵循以下三种方法&#xff1a; 1. 全局组件 全局组件是通过 Vue.component() 方法创建的&#xff0c;注册后的组件可以在任何新创建的 Vue 实例&#xff08;包括根实例&#xff09;的模板中使用。 Vue.component(my-component,…...

华为OBS命令行简单使用

华为OBS&#xff08;Object Storage Service&#xff09;是一种云存储服务&#xff0c;提供了高可靠、高性能、安全的数据存储能力。通过使用OBS的命令行工具obsutil&#xff0c;用户可以方便地进行文件上传、下载、删除等操作&#xff0c;而无需依赖图形界面。下面&#xff0c…...

避免超卖!深入解析高并发分布式锁架构

1.引入并发控制的必要性 并发控制是一切分布式系统设计的基石&#xff0c;确保数据一致性、系统稳定性和最终的用户体验。要理解为什么需要并发控制&#xff0c;就必须先探讨并发对系统可能造成的问题。 1.1. 理解并发问题 多线程和分布式环境中&#xff0c;无数的进程和线程…...