当前位置: 首页 > news >正文

【RabbitMQ(day2)】默认(直连)交换机的应用

文章目录

  • 一、第一种模型(Hello World)
  • 二、第二种模型(work queue)
    • 自动确认机制的后果和公平分配
  • 三、阐述默认交换机

这篇博客是以下资料学后的总结:
不良人的RabbitMQ的教学视频
官方启动教程
RabbitMQ中文文档

一、第一种模型(Hello World)

在这里插入图片描述

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序;
  • C:消费者:消息的接受者,会一直等待消息的到来。
  • queue:消息队列,图中红色部分。类似于一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
  1. 开发生产者
		// 创建连接mq的连接工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();// 设置连接Rabbitmq的主机connectionFactory.setHost("192.168.248.135");// 设置端口号connectionFactory.setPort(5672);// 设置连接那个虚拟主机connectionFactory.setVirtualHost("/ems");// 设置用户名和密码connectionFactory.setUsername("ems");connectionFactory.setPassword("123456");// 获取连接对象Connection connection = connectionFactory.newConnection();// 获取连接中的通道对象Channel channel = connection.createChannel();// 通过通道绑定对应的消息队列// 参数1:队列的名称  如果队列不存在会自动创建// 参数2:用来定义队列特性是否需要持久化,true:持久化队列,false即不持久化// 参数3:exclusive 是否独占队列// 参数4:是否在消费完成后自动删除队列// 参数5:额外参数// 这个不加是没关系的,只是表示我的Rabbitmq中是有hello消息队列的,消费者产生的
channel.queueDeclare("hello",false,false,false,null);// 发布消息// 参数1:交换机名称;参数2:路由键名称;参数3:传递消息额外设置;参数4:消息的具体内容channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());channel.close();connection.close();
  1. 消费者开始消费
 		// 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 设置主机和端口connectionFactory.setHost("192.168.248.135");connectionFactory.setPort(5672);// 设置虚拟主机connectionFactory.setVirtualHost("/ems");// 设置用户名和密码connectionFactory.setUsername("ems");connectionFactory.setPassword("123456");// 创建连接对象Connection connection = connectionFactory.newConnection();// 创建通道Channel channel = connection.createChannel();// 通道绑定对象channel.queueDeclare("hello",false,false,false,null);// 消费消息// 参数1:消费哪个队列的消息  队列名称// 参数2:开始消息的自动确认机制// 参数3:消费消息时的回调接口String hello = channel.basicConsume("hello", true, new DefaultConsumer(channel){// 最后一个参数:消息队列中取出的消息@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {System.out.println("转化成对应的字符串: " + new String(body));System.out.println("============");}});/*channel.close();connection.close();*/

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vR02ySmZ-1690641488995)(C:\Users\myz03\AppData\Roaming\Typora\typora-user-images\image-20230727224814359.png)]

这里需要注意,由于这里是多线程下生产、消费消息,所以在消费时不应该提前关闭通道,不然无法监听到队列中的数据。

下面是证明,看看各线程的名称就知道了。

在这里插入图片描述

需注意:Junit5之前是不支持多线程的。

  1. 参数说明
channel.queueDeclare("hello",true,false,true,null);
"参数1":用来声明通道对应的队列;
"参数2":用来指定是否持久化队列
"参数3":用来指定是否独占队列,一般为false
"参数4":用来指定是否自动删除队列
"参数5":对队列的额外配置参数1:交换机名称;参数2:路由键名称;参数3:传递消息额外设置;参数4:消息的具体内容
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"I love you~".getBytes());

二、第二种模型(work queue)

Work Queues,也被称为(Task Queues 任务模型)。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用 work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-55cGZian-1690641488996)(C:\Users\myz03\AppData\Roaming\Typora\typora-user-images\image-20230729113349745.png)]

角色:

  • Sender:生产者:任务的发布者
  • Consumer:消费者,领取任务并且完成任务

生产者代码

        // 获取连接对象Connection conn = RabbitMQUtils.getConnection();// 获取通道对象Channel channel = conn.createChannel();// 通过通道声明队列channel.queueDeclare("work", true, false, false, null);for (int i = 0; i < 10; ++i) {// 生产消息channel.basicPublish("", "work", null, ("(" + i + ")Hello worke queue~").getBytes());}// 关闭资源RabbitMQUtils.closeConnectionAndChanel(conn, channel);

消费者代码

Runnable myRunnable = new Runnable() {@Overridepublic void run() {Connection conn = RabbitMQUtils.getConnection();try {final int[] cnt = new int[1];cnt[0] = 0;Channel channel = conn.createChannel();channel.queueDeclare("work", true, false, false, null);System.out.println("当前线程:" + Thread.currentThread().getName());channel.basicConsume("work", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {System.out.println("消费者-" + consumerTag + ":" + new String(body));System.out.println("================================================");cnt[0]++;}});System.out.println(Thread.currentThread().getName() + ":" + cnt[0]);} catch (Exception e) {e.printStackTrace();}}};Thread work1 = new Thread(myRunnable, "work-001");Thread work2 = new Thread(myRunnable, "work-002");Thread work3 = new Thread(myRunnable, "work-003");work1.start();work2.start();work3.start();

测试结果

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CgpB8a9p-1690641488997)(C:\Users\myz03\AppData\Roaming\Typora\typora-user-images\image-20230729125901348.png)]

总结:默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。如下图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-j1M02g6j-1690641488998)(C:\Users\myz03\AppData\Roaming\Typora\typora-user-images\image-20230729190556081.png)]

下面是官方给的

By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.

自动确认机制的后果和公平分配

channel.basicConsume("work", true, new DefaultConsumer(channel)
// 这里的第二个参数是指是否默认提交

上面 work queue 实现会出现两个问题:

问题一:这里将 autoAcked 参数值设置为了 true,即消费者收到消息队列调度的消息后不管有没有消费成功都立即返回 ACK 确认,消息队列只顾着轮询分配去了。这个时候的话会引发一个问题:当消费者突然宕机了,那还没处理的消息就不会被处理,即消失了。比如一个消费者被分配到了五个消息,但是只处理了三个就嘎了,那剩下的俩个就处理不了了。

问题二:在两个worker的情况下,当所有奇数消息都很重,偶数消息都很轻时,一个worker将一直很忙,而另一个几乎不做任何工作。但是RabbitMQ对此一无所知,仍然会均匀地分发消息。我们应该遵循能者多劳,充分利用资源,但轮询方式总是这么的不合我们的胃口。

解决方案

  • 首先得将自动提交设置为 false,手动提交就好了;

  • 每一次给空闲发消费者一个消息,即设置 prefetchCount = 1,这样的话不会让能者出现不工作,懒者一堆事没做的情况。当消费者死亡(即通道关闭、连接被关闭、或者TCP连接丢失等情况)还没有发送ACK,那有其他消费者在线的话,消息队列会将消息迅速交付给另一个消费者,从而确保消息没有丢失。

    具体解决方案的伪代码如下:

// 配置每一次只能执行一个小希
channel.basicQos(1);
// 关闭手动提交
channel.basicConsume("work", false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {System.out.println("消费者-" + consumerTag + ":" + new String(body));// 参数1:确认队列中哪些具体消息 参数2:是否开启多个消息同时确认channel.basicAck(envelope.getDeliveryTag(), false);}
});

三、阐述默认交换机

可以在 RabbitMQ中文文档-默认交换机 去了解更多AMQP协议的一些内容。

默认交换机的本质是直连交换机,当你添加一个队列的时候,这个队列第一反应就是绑定默认交换机,而绑定(binding)的路由键名称和队列名称是一致的。

上面两种模型(RabbitMQ官方教程阐述的)Hello World模型和Work Queues模型,在官方教程中没有指出使用了交换机,但是本质都是绑定了默认交换机的,也就是直连交换机,它也是支持多消费者的负载均衡的。

首先必须知道的是:使用默认交换机时,队列是在消费者端创建的(可以说是用户本身吧),而不是生产者去创建的。当生产者发送一条消息到 RabbitMQ 时,RabbitMQ 会根据消息的路由键(在使用默认交换机的情况下,路由键即为队列名称)来查找是否已经存在该队列,如果队列不存在,则会丢弃该消息。

basicPublish 方法的第二个参数为路由键名称和 basicConsume 方法的第一个参数为队列名称也是可以看出来的。

总的来说就是生产者不需要关心队列的创建,这是消费者需要声明指定的,默认交换机会绑定声明的消息队列的,所以生产者该发发,创建的任务就不用管了。

相关文章:

【RabbitMQ(day2)】默认(直连)交换机的应用

文章目录 一、第一种模型&#xff08;Hello World&#xff09;二、第二种模型&#xff08;work queue&#xff09;自动确认机制的后果和公平分配 三、阐述默认交换机 这篇博客是以下资料学后的总结&#xff1a; 不良人的RabbitMQ的教学视频 官方启动教程 RabbitMQ中文文档 一、…...

谷粒商城第八天-商品服务之品牌管理的整体实现(直接使用逆向生成的代码;含oss文件上传)

目录 一、总述 二、前端部分 2.1 创建好品牌管理菜单 2.2 复制组件 ​编辑2.3 复制api ​​​编辑 2.4 查看效果 ​编辑2.5 需要优化的地方 2.6 具体优化实现 2.6.1 优化一&#xff1a;将表格的状态列&#xff08;这里是是否显示列&#xff09;修改为开关&#xff…...

阿里云率先荣获容器集群稳定性先进级认证

7 月 25 日&#xff0c;由中国信通院发起的“2023 稳保体系”评估结果在可信云大会现场公布&#xff0c;阿里云容器服务 ACK 成为首批通过“云服务稳定运行能力-容器集群稳定性”评估的产品&#xff0c;并荣获“先进级”认证。 云原生技术正在激活应用构建新范式&#xff0c;构…...

【SpringBoot笔记37】SpringBoot基于@ServerEndpoint、@OnMessage等注解的方式集成WebSocket

这篇文章,主要介绍SpringBoot基于@ServerEndpoint、@OnMessage等注解的方式集成WebSocket。 目录 一、基于注解集成WebSocket 1.1、WebSocket常见注解 1.2、创建WebSocket服务端 1.3、配置ServerEndpointExpor...

PyTorch(安装及卸载)

目录 1. 安装 2. 卸载 参考文献 为什么用PyTorch&#xff1a;简单来说&#xff0c;19年之前tensorflow是大哥&#xff0c;19年tensorflow和PyTorch双龙并行&#xff0c;20年之后PyTorch一往无前。宗旨&#xff0c;哪个用的人多用哪个。 1. 安装 1. 先打开Anaconda Prompt&…...

webScoket

webScoket是什么&#xff1f; 支持端对端通讯可以由客户端发起&#xff0c;也可以有服务端发起用于消息通知、直播间讨论区、聊天室、协同编辑等 做一个简单的webScoket 客户端配置&#xff1a; 1、新建一个页面叫web-scoket.html <!DOCTYPE html> <html lang"…...

【C语言初阶(20)】调试练习题

文章目录 前言实例1实例2 前言 在我们开始调试之前&#xff0c;应该有个明确的思路&#xff1b;程序是如何完成工作的、变量到达某个步骤时的值应该是什么、出现的问题大概会在什么位置。这些东西在调试之前都需要先确认下来&#xff0c;不然自己都不知道自己在调试个什么东西…...

MicroPython ESP32网页实时更新DHT11数据显示

MicroPython ESP32网页实时更新DHT11数据显示 &#x1f4cc;相关篇《MicroPython ESP32 读取DHT11温湿度传感器数据》&#x1f4cd;《【Micropython esp32/8266】网页点灯控制示例》 ✨本例综合以上两篇文章内容实现&#xff1a;在本地网页中显示DHT11温度传感器数据。可以做到…...

JavaWeb之HTML基础篇(一)

系列文章目录 HTML基础篇&#xff08;一&#xff09; 文章目录 系列文章目录HTML基础篇&#xff08;一&#xff09;[TOC](文章目录) 前言一、HTML简介1.1介绍1.2HTML文件的书写规范1.3 HTML标签介绍1.4 HTML常见的标签 二、CSS的简介2.1css技术介绍2.2 CSS与HTML结合的三种方式…...

TVM_深度学习编译器

TVM_深度学习编译器 TVM所做的是要比传统compiler更偏上层的,你可以把它理解成source-to-source compiler,需要其他的后端(backend)来生成最后的指令。比如当编译的Target是Intel CPU时,翻译的顺序是Relay IR -> TVM IR/ Halide IR -> LLVM IR,之后交给LLVM生成最后…...

Flutter InheritedWidget 共享状态管理

InheritedWidget和React中的context功能类似&#xff0c;可以实现跨组件数据的传递。 定义一个共享数据的InheritedWidget&#xff0c;需要继承自InheritedWidget 这里定义了一个of方法&#xff0c;该方法通过context开始去查找祖先的HYDataWidget&#xff08;可以查看源码查找…...

什么是反射?Java反射?反射的优缺点

目录 什么是反射&#xff08;Reflection &#xff09;&#xff1f;Java反射&#xff1f;反射的优缺点获取Class对象的三种方式&#xff1a;java反射技术的应用场景 什么是反射&#xff08;Reflection &#xff09;&#xff1f; 主要是指程序可以访问、检测和修改它本身状态或行…...

小红书2020校招测试开发后端笔试题卷三

//完全背包求组合数 #include <iostream> #include<vector> #include<set> #include<map> #include<algorithm> using namespace std; int value[300]; // vector<int>vis; // vector<int>vis1; map<vector<int>,int>m…...

python数据可视化Matplotlib

1.绘制简单的折线图 # -*- coding: utf-8 -*- import matplotlib.pyplot as pltinput_values [1, 2, 3, 4, 5] squares [1, 4, 9, 16, 25] plt.style.use(seaborn) fig, ax plt.subplots() ax.plot(input_values, squares, linewidth3) # 线条粗细# 设置图表标题并给坐标…...

firewalld防火墙

firewalld防火墙 它属于包过滤防火墙。工作在网络层&#xff0c;是centos7自带的默认防火墙。主要是取代iptables。 firewalld的两种配置模式&#xff1a; 分别是运行时配置和永久配置。 iptable是静态防火墙 firewalld是动态防火墙 它是按照区域来划分的&#xff0c;有9…...

iMacros WebBrowser Component for .NET

iMacros WebBrowser Component for .NET 在几分钟内实现应用程序自动化 快速轻松地将iMacro集成到您的应用程序中。不需要单独的安装程序。 无缝集成 iMacros与您的.NET应用程序无缝集成&#xff0c;作为Microsoft WebBrowser控件的替代品。它甚至可以用作每个.NET应用程序中的…...

3,堆,桶排序,排序总结【p4-p5】

桶排序&#xff0c;排序总结 3.1堆3.1.1堆结构3.1.1.1完全二叉树3.1.1.2堆分为大根堆和小根堆3.1.1.2.1案例1-去掉最大数字&#xff0c;使剩下的数字依然是大根堆3.1.1.2.2案例23.1.1.2.3案例33.1.1.2.4案例4-方便制造大根堆方法 3.1.2堆排序3.1.2.1案例-堆排序的扩展3.1.2.1.1…...

使用langchain与你自己的数据对话(四):问答(question answering)

之前我已经完成了使用langchain与你自己的数据对话的前三篇博客&#xff0c;还没有阅读这三篇博客的朋友可以先阅读一下&#xff1a; 使用langchain与你自己的数据对话(一)&#xff1a;文档加载与切割使用langchain与你自己的数据对话(二)&#xff1a;向量存储与嵌入使用langc…...

如何快速开拓海外华人市场?附解决方案!

开拓华人市场对于企业来说是非常必要的。华人市场庞大且潜力巨大&#xff0c;拥有巨额的消费能力。随着华人经济的不断增长&#xff0c;越来越多的企业开始意识到华人市场的重要性。 通过开拓华人市场&#xff0c;企业可以获得更多的销售机会&#xff0c;并且在竞争激烈的市场…...

【云原生-制品管理】制品管理的优势

制品介绍制品管理-DevOps制品管理优势总结 制品介绍 制品管理指的是存储、版本控制和跟踪在软件开发过程中产生的二进制文件或“制品”的过程。这些制品可以包括编译后的源代码、库和文档&#xff0c;包括操作包、NPM 和 Maven 包&#xff08;或像 Docker 这样的容器镜像&…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)

一、数据处理与分析实战 &#xff08;一&#xff09;实时滤波与参数调整 基础滤波操作 60Hz 工频滤波&#xff1a;勾选界面右侧 “60Hz” 复选框&#xff0c;可有效抑制电网干扰&#xff08;适用于北美地区&#xff0c;欧洲用户可调整为 50Hz&#xff09;。 平滑处理&…...

盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来

一、破局&#xff1a;PCB行业的时代之问 在数字经济蓬勃发展的浪潮中&#xff0c;PCB&#xff08;印制电路板&#xff09;作为 “电子产品之母”&#xff0c;其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透&#xff0c;PCB行业面临着前所未有的挑战与机遇。产品迭代…...

【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)

🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...

python执行测试用例,allure报乱码且未成功生成报告

allure执行测试用例时显示乱码&#xff1a;‘allure’ &#xfffd;&#xfffd;&#xfffd;&#xfffd;&#xfffd;ڲ&#xfffd;&#xfffd;&#xfffd;&#xfffd;ⲿ&#xfffd;&#xfffd;&#xfffd;Ҳ&#xfffd;&#xfffd;&#xfffd;ǿ&#xfffd;&am…...

uniapp 小程序 学习(一)

利用Hbuilder 创建项目 运行到内置浏览器看效果 下载微信小程序 安装到Hbuilder 下载地址 &#xff1a;开发者工具默认安装 设置服务端口号 在Hbuilder中设置微信小程序 配置 找到运行设置&#xff0c;将微信开发者工具放入到Hbuilder中&#xff0c; 打开后出现 如下 bug 解…...

五子棋测试用例

一.项目背景 1.1 项目简介 传统棋类文化的推广 五子棋是一种古老的棋类游戏&#xff0c;有着深厚的文化底蕴。通过将五子棋制作成网页游戏&#xff0c;可以让更多的人了解和接触到这一传统棋类文化。无论是国内还是国外的玩家&#xff0c;都可以通过网页五子棋感受到东方棋类…...

rknn toolkit2搭建和推理

安装Miniconda Miniconda - Anaconda Miniconda 选择一个 新的 版本 &#xff0c;不用和RKNN的python版本保持一致 使用 ./xxx.sh进行安装 下面配置一下载源 # 清华大学源&#xff08;最常用&#xff09; conda config --add channels https://mirrors.tuna.tsinghua.edu.cn…...

goreplay

1.github地址 https://github.com/buger/goreplay 2.简单介绍 GoReplay 是一个开源的网络监控工具&#xff0c;可以记录用户的实时流量并将其用于镜像、负载测试、监控和详细分析。 3.出现背景 随着应用程序的增长&#xff0c;测试它所需的工作量也会呈指数级增长。GoRepl…...

HTTPS证书一年多少钱?

HTTPS证书作为保障网站数据传输安全的重要工具&#xff0c;成为众多网站运营者的必备选择。然而&#xff0c;面对市场上种类繁多的HTTPS证书&#xff0c;其一年费用究竟是多少&#xff0c;又受哪些因素影响呢&#xff1f; 首先&#xff0c;HTTPS证书通常在PinTrust这样的专业平…...