当前位置: 首页 > news >正文

RabbitMQ消息队列

简介

MQ(message queue),从字面意思上看就个 FIFO 先入先出的队列,只不过队列中存放的内容是 message 而已,它是一种具有接收数据、存储数据、发送数据等功能的技术服务。

作用:流量削峰、应用解耦、异步处理。

在这里插入图片描述
生产者将消息发送到消息队列中,消息队列负责转发消息给消费者,消费者在处理完消息后会对消息队列进行应答,消息队列收到应答信息会将相应的消息进行丢弃。

批量应答会导致高并发时消息的丢失,所以尽力以channel.ack()进行手动应答。

docker安装

  1. 拉取镜像并后台运行
docker run -id --name=rabbitmq -v rabbitmq-home:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=yi -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq

需要将RABBITMQ_DEFAULT_USER、RABBITMQ_DEFAULT_PASS改成自己的用户名、密码。

  1. 开启manager插件,可以在网页进行管理。
 docker exec -it 容器id /bin/bash  #这里可以用docker ps 查询刚刚开启的容器id#进入容器后输入,开启rabbitmq-plugins enable rabbitmq_management

可以登录 http://服务器IP:15672 访问web管理界面,访问成功则代表开启成功。

JAVA环境搭建

jar包:

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.25</version></dependency></dependencies>

Helloworld实例

生产者

public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setUsername("yi");connectionFactory.setPassword("123456");//获取连接Connection connection = connectionFactory.newConnection();//获取信道,一个连接中有多个信道Channel channel = connection.createChannel();//声明一个队列 String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> argumentsAMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message="hello world";//(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("发送成功");}

消费者

public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setUsername("yi");connectionFactory.setPassword("123456");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();DeliverCallback deliverCallback=(consumerTag,message)->{System.out.println(new String(message.getBody()));};CancelCallback cancelCallback=(String var1)->{System.out.println("消息消费被中断");};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}

工作队列(任务队列)

RabbitMQ默认为工作队列模式,消费者C1,C2为竞争关系,接收到的消息将轮询发送给C1,C2处理,即C1一条C2一条依次循环。
在这里插入图片描述

手动应答ack

因为自动应答不会考虑消息是否处理成功,所以可能会导致消息丢失,需要在代码中将自动应答改为手动应答。批量应答在高并发的时候也容易丢失消息,也应该关闭。

生产者的代码无需修改。
消费者:

public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();System.out.println("work2 waiting:");DeliverCallback deliverCallback= (String s, Delivery delivery)->{System.out.println(new String(delivery.getBody()));// do something//手动回复ack,false为关闭批量应答channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback=(s)->{System.out.println("消息被打断");};//false表示不自动应答ackchannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}

不公平分发

会存在有些线程能力差耗时长,有些能力强耗时短的情况,不公平分发将实现能者多劳。
设立channel的basicQos即可实现不公平分发, basicQos的数值意味着channel的最大存储上限,channel为1时,消费者最多同时缓存一条待处理消息。

channel.basicQos(1);

发布确认

在开启队列持久化、消息持久化后,RabbitMQ服务器仍然可能在将消息存储在磁盘前宕机,需要发布确认才能保证消息不丢失,即RabbitMQ在存储磁盘成功后,发送确认给生产者。

单个发布确认

每条消息存储在磁盘后进行发布确认,只有发送者在接收到消费者对应的发布确认消息后才会给此消费者发送下一条消息。

public static void publicMsgIndividual()throws Exception{Channel channel = RabbitMQUtils.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();//开启持久化channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.confirmSelect();//开启发布确认long begin = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {channel.basicPublish("",QUEUE_NAME,null, new String(i+" ").getBytes());       boolean flag = channel.waitForConfirms(); //等待发布确认if(flag){System.out.println("消息发送成功");}}long end = System.currentTimeMillis();System.out.println("发布1000条耗时:"+(end-begin)+"ms");}

批量发布确认

每发送100条消息进行一次发布确认。速度快,但是不知道具体是哪一条消息发送失败了。

public static void publicMsgIndividual()throws Exception{Channel channel = RabbitMQUtils.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();//开启持久化channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.confirmSelect();//开启发布确认long begin = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {channel.basicPublish("",QUEUE_NAME,null, new String(i+" ").getBytes());if(i%100==0){boolean flag = channel.waitForConfirms(); //等待发布确认if(flag){System.out.println("消息发送成功");}}}long end = System.currentTimeMillis();System.out.println("发布1000条耗时:"+(end-begin)+"ms");}

异步发布确认

推荐使用,需要加入确认发布监听器confirmListener,并且记录序列号与消息的关联(ConcurrentSkipListMap)。

 public static void publicMsgAsync()throws Exception{Channel channel = RabbitMQUtils.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();//开启持久化channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.confirmSelect();//开启发布确认//       将序列号与信息相关联,ConcurrentSkipListMap concurrentSkipListMap = new ConcurrentSkipListMap<Long,String>();//加入确认监听器channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long msgTag, boolean multiply) throws IOException {System.out.println("消息发送成功:"+msgTag);if(multiply) { //如果是批量确认,批量删除//headMap返回小于msgTag的map视图ConcurrentNavigableMap concurrentNavigableMap = concurrentSkipListMap.headMap(msgTag);//清理已经标记的MapconcurrentNavigableMap.clear();}else {concurrentSkipListMap.remove(msgTag);}}@Overridepublic void handleNack(long msgTag, boolean multiply) throws IOException {System.out.println("未确认的消息:"+concurrentSkipListMap.get(msgTag));}});long begin = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {channel.basicPublish("",QUEUE_NAME,null, new String(i+" ").getBytes());//记录发送的信息与其序列号concurrentSkipListMap.put(channel.getNextPublishSeqNo(),new String(i+" "));}long end = System.currentTimeMillis();System.out.println("发布1000条耗时:"+(end-begin)+"ms");}

发布/订阅模式(fanout交换机)

首先要弄明白交换机和队列的关系,交换机负责信息的接收,通过不同的RountingKey将消息转发到不同的队列,每个队列上的接收者都是竞争关系(即队列上的消息只会被处理一次),那么当一个交换机对应多个队列时,每个队列仅有一个消费者,这个时候即发布/订阅模式,消息会被每个消费者接收。
在这里插入图片描述

生产者代码:向交换机中发送消息

public static final String EXCHANGE_NAME="logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String next = scanner.next();channel.basicPublish(EXCHANGE_NAME,"", null,next.getBytes());}}

消费者代码:声明匿名队列,将队列绑定到交换机上,不同的消费者用相同的RountingKey,以便同时接收到消息。

public static final String EXCHANGE_NAME="logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //FANOUT煽出,就是发布订阅模式String queue = channel.queueDeclare().getQueue(); //声明匿名队列channel.queueBind(queue,EXCHANGE_NAME,""); //将队列绑定到交换机上,RountingKey为“”DeliverCallback deliverCallback=(consumerTag,message)->{System.out.println("接收到消息:"+new String(message.getBody()));};channel.basicConsume(queue,true,deliverCallback, (consumerTag)->{});}

Direct交换机

与fanout模式相比,不同的队列有不同的Rounting key,通过Rounting Key能够直接向指定队列发送消息。

Topic交换机

rountingKey作为匹配串,发送消息时,匹配上的则能进行发送。
routingKey必须是单词列表,用.隔开。如aa.bb.cc
*可以代表一个单词 ,#可以代表若干个单词
比如向rountingKey为aa.orange.rabbit发送消息,Q1和Q2都能接收到消息,而向aa.orange.bb发送消息则只有Q1能够接收到消息。
在这里插入图片描述
当队列的rountingKey绑定的#,则相当于fanout煽出交换机。
当队列的rountingKey绑定不带#*时,相当于direct交换机。

死信队列

在队列中1消息超时、2无法处理、3队列已满时,消息会被送入死信队列。

相关文章:

RabbitMQ消息队列

简介 MQ(message queue)&#xff0c;从字面意思上看就个 FIFO 先入先出的队列&#xff0c;只不过队列中存放的内容是 message 而已&#xff0c;它是一种具有接收数据、存储数据、发送数据等功能的技术服务。 作用&#xff1a;流量削峰、应用解耦、异步处理。 生产者将消息发送…...

ModBus电表与RS485电表有哪些区别?

在能源计量领域&#xff0c;ModBus电表和RS485电表是两种常见的设备&#xff0c;它们都具有监测和记录电能数据的功能。然而&#xff0c;它们之间存在一些区别&#xff0c;比如通信协议、连接方式、数据格式等等参数的区别有哪些&#xff1f; ModBus电表和RS485电表都是用于电能…...

vue项目运行时,报错:ValidationError: webpack Dev Server Invalid Options

在运行vue项目中&#xff0c;遇到报错&#xff1a;ValidationError: webpack Dev Server Invalid Options&#xff0c;如下图截图&#xff1a; 主要由于vue.config.js配置文件错误导致的&#xff0c;具体定位到proxy配置代理不能为空&#xff0c;导致运行项目报错&#xff0c;需…...

书摘:C 嵌入式系统设计模式 02

本书的原著为&#xff1a;《Design Patterns for Embedded Systems in C ——An Embedded Software Engineering Toolkit 》&#xff0c;讲解的是嵌入式系统设计模式&#xff0c;是一本不可多得的好书。 本系列描述我对书中内容的理解。 结构化编程将软件组织成两个截然不同的…...

排序算法基本原理及实现1

&#x1f4d1;打牌 &#xff1a; da pai ge的个人主页 &#x1f324;️个人专栏 &#xff1a; da pai ge的博客专栏 ☁️宝剑锋从磨砺出&#xff0c;梅花香自苦寒来 &#x1f4d1;插入排序 &#x1f4…...

Unity 轨道展示系统(DollyMotion)

DollyMotion &#x1f371;功能展示&#x1f959;使用&#x1f4a1;设置路径点&#x1f4a1;触发点位切换&#x1f4a1;动态更新路径点&#x1f4a1;事件触发&#x1f4a1;设置路径&#x1f4a1;设置移动方案固定速度方向最近路径方向 &#x1f4a1;设置移动速度曲线 传送门 &a…...

优维低代码实践:搜索功能

优维低代码技术专栏&#xff0c;是一个全新的、技术为主的专栏&#xff0c;由优维技术委员会成员执笔&#xff0c;基于优维7年低代码技术研发及运维成果&#xff0c;主要介绍低代码相关的技术原理及架构逻辑&#xff0c;目的是给广大运维人提供一个技术交流与学习的平台。 优维…...

C# ReadOnlyRef Out

C# ReadOnly ReadOnly先看两种情况1.值类型2.引用类型 结论 Ref Out ReadOnly官方文档 ReadOnly 先看两种情况 1.值类型 当数据是值类型时&#xff0c;标记为Readonly时&#xff0c;如果再次设置值&#xff0c;会提示报错&#xff0c;无法分配到只读字段 public class A {pri…...

linux 服务 下 redis 安装和 启动

官网下载 https://redis.io/download/ 安装步骤&#xff1a; 1.安装redis 所需要的依赖 yum install -y gcc tcl2.上传安装包并解压&#xff0c;下载安装包&#xff0c;上传到/usr/local/src目录&#xff0c;解压 tar -zxvf redis-7.2.3.tat.gz进入安装目录&#xff0c;运行…...

ECharts与Excel的结合实战

引言&#xff1a;本文是一篇ECharts和Excel实战的记录。将Excel与ECharts产生火花&#xff0c;从Excel读取数据然后在ECharts上展示。 1.柱状图前端代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title…...

UDP的特点及应用场景

目录 UDP特点 应用场景 总结 User Datagram Protocol&#xff08;UDP&#xff0c;用户数据报协议&#xff09;是互联网协议套件中的一种传输层协议。与TCP不同&#xff0c;UDP是一种无连接的、不可靠的协议。 UDP特点 要知道UDP可以用来做什么&#xff0c;首先我们要知道它…...

Python开发——工具篇 Pycharm的相关配置,Python相关操作 持续更新

前言 本篇博客是python开发的工具篇相关&#xff0c;介绍pycharm的使用和相关配置&#xff0c;收录python的相关操作&#xff0c;比如如何启动jupyter。 目录 前言引出Pycharmpycharm如何不同等级日志显示不同颜色设置不同pycharm的python环境 Python操作如何启动Jupyter 总结…...

【深度学习】卷积神经网络结构组成与解释

卷积神经网络是以卷积层为主的深度网路结构&#xff0c;网络结构包括有卷积层、激活层、BN层、池化层、FC层、损失层等。卷积操作是对图像和滤波矩阵做内积&#xff08;元素相乘再求和&#xff09;的操作。 1. 卷积层 常见的卷积操作如下&#xff1a; 卷积操作解释图解标准卷…...

从源码解析Containerd容器启动流程

从源码解析Containerd容器启动流程 本文从源码的角度分析containerd容器启动流程以及相关功能的实现。 本篇containerd版本为v1.7.9。 更多文章访问 https://www.cyisme.top 本文从ctr run命令出发&#xff0c;分析containerd的容器启动流程。 ctr命令 查看文件cmd/ctr/comman…...

引迈-JNPF低代码项目技术栈介绍

从 2014 开始研发低代码前端渲染&#xff0c;到 2018 年开始研发后端低代码数据模型&#xff0c;发布了JNPF开发平台。 谨以此文针对 JNPF-JAVA-Cloud微服务 进行相关技术栈展示&#xff1a; 1. 项目前后端分离 前端采用Vue.js&#xff0c;这是一种流行的前端JavaScript框架&a…...

如何处理枚举类型(下)

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 上一篇我们通过编写MyB…...

wsj0数据集原始文件.wv1.wv2转换成wav文件

文章目录 准备一、获取WSJO数据集二、安装sph2pipe三、转换代码四、结果展示 ​ 最近做语音分离实验需要用到wsj0-2mix数据集&#xff0c;但是从李宏毅语音分离教程里面获取的wsj0-2mix只有一部分。从网上获取到了完整的WSJO数据集后&#xff0c;由于原始的语音文件后缀是wv1或…...

Flask Session 登录认证模块

Flask 框架提供了强大的 Session 模块组件&#xff0c;为 Web 应用实现用户注册与登录系统提供了方便的机制。结合 Flask-WTF 表单组件&#xff0c;我们能够轻松地设计出用户友好且具备美观界面的注册和登录页面&#xff0c;使这一功能能够直接应用到我们的项目中。本文将深入探…...

【运维】hive 高可用详解: Hive MetaStore HA、hive server HA原理详解;hive高可用实现

文章目录 一. hive高可用原理说明1. Hive MetaStore HA2. hive server HA 二. hive高可用实现1. 配置2. beeline链接测试3. zookeeper相关操作 一. hive高可用原理说明 1. Hive MetaStore HA Hive元数据存储在MetaStore中&#xff0c;包括表的定义、分区、表的属性等信息。 hi…...

C#开发的OpenRA游戏之属性SelectionDecorations(13)

C#开发的OpenRA游戏之属性SelectionDecorations(13) 在前面分析SelectionDecorations属性类时,会发现它有下面这个属性: public class SelectionDecorations : SelectionDecorationsBase, IRender { readonly Interactable interactable; 它是定义了一个Interactabl…...

嵌入式C语言设计模式实践:观察者与责任链模式

1. 嵌入式软件开发中的设计模式应用背景在传统认知中&#xff0c;嵌入式系统开发往往与"资源受限"、"底层硬件"、"效率优先"等标签紧密关联。早期的嵌入式设备功能单一&#xff0c;业务逻辑简单&#xff0c;开发者更关注代码的执行效率和硬件资源…...

大厂面试真题揭秘:38W-55W年薪,大模型算法工程师核心考点全解析!

面试信息 岗位&#xff1a;大模型应用算法工程师-电商方向 类别&#xff1a;算法类 - 自然语言处理 地点&#xff1a;杭州 bg:普通211 渣硕 薪资情况 薪资构成&#xff1a;16 薪&#xff0c;属于互联网第一梯队。 硕士 总包&#xff1a;38W ~ 55W / 年普通档&#xff1a;38W ~ …...

五大PHP框架对比:如何选择最适合你的?

PHP 常用的框架包括&#xff1a;1. Laravel特点&#xff1a;优雅的语法、强大的 ORM&#xff08;Eloquent&#xff09;、丰富的扩展包&#xff08;Composer&#xff09;、完善的文档。适用场景&#xff1a;中大型 Web 应用、API 开发、需要快速构建复杂功能。2. Symfony特点&am…...

【数据可视化实战】从API到图表:一步步构建奥运奖牌榜与运动员数据分析平台

1. 数据获取&#xff1a;从API到结构化数据 做数据分析的第一步永远是获取数据。这次我们选择奥运奖牌榜和运动员数据作为案例&#xff0c;主要是因为这类数据公开透明且结构清晰&#xff0c;非常适合新手练手。我实测下来&#xff0c;咪咕视频的奥运数据接口非常稳定&#xf…...

93.91%压缩率背后的技术革命:CompressO如何解决企业级视频处理的效率困境

93.91%压缩率背后的技术革命&#xff1a;CompressO如何解决企业级视频处理的效率困境 【免费下载链接】compressO Convert any video/image into a tiny size. 100% free & open-source. Available for Mac, Windows & Linux. 项目地址: https://gitcode.com/gh_mirr…...

原神玩家必备:Snap Hutao工具箱5大核心功能让游戏体验升级

原神玩家必备&#xff1a;Snap Hutao工具箱5大核心功能让游戏体验升级 【免费下载链接】Snap.Hutao 实用的开源多功能原神工具箱 &#x1f9f0; / Multifunctional Open-Source Genshin Impact Toolkit &#x1f9f0; 项目地址: https://gitcode.com/GitHub_Trending/sn/Snap…...

三三复制小公排小程序开发指南

了解三三复制模式三三复制是一种常见的分销或团队裂变模式&#xff0c;通常用于社交电商或会员制营销。该模式通过用户邀请新用户加入并形成层级关系&#xff0c;实现快速推广。在小程序中实现该功能需要设计合理的用户关系和奖励机制。开发前的准备工作注册微信小程序开发者账…...

CKKS 同态加密数学基础推导嗡

背景 StreamJsonRpc 是微软官方维护的用于 .NET 和 TypeScript 的 JSON-RPC 通信库&#xff0c;以其强大的类型安全、自动代理生成和成熟的异常处理机制著称。在 HagiCode 项目中&#xff0c;为了通过 ACP (Agent Communication Protocol) 与外部 AI 工具&#xff08;如 iflow …...

机械设备出口单证操作全攻略

# 【外贸干货】机械设备出口单证操作全攻略&#xff1a;新手必看的报关、信用证、原产地证实操指南 ## 前言 做机械外贸&#xff0c;产品谈好了、合同签了&#xff0c;接下来最让新手头疼的就是单证操作。 报关单填错了&#xff0c;货物被扣&#xff1b;信用证软条款没发现&…...

AI辅助开发新体验:描述需求,让快马AI直接打开一个情感分析应用

AI辅助开发新体验&#xff1a;描述需求&#xff0c;让快马AI直接打开一个情感分析应用 最近在尝试用AI辅助开发&#xff0c;发现InsCode(快马)平台的体验真的很惊艳。以前做个简单的文本情感分析&#xff0c;得自己找数据集、训练模型、写前后端代码&#xff0c;现在只需要用自…...