200、使用默认 Exchange 实现 P2P 消息 之 消息生产者(发送消息) 和 消息消费者(消费消息)
RabbitMQ 工作机制图:
Connection: 代表客户端(包括消息生产者和消费者)与RabbitMQ之间的连接。
Channel: 连接内部的Channel。channel:通道
Exchange: 充当消息交换机的组件。
Queue: 消息队列。
★ 消费消息
使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下:
(1)创建ConnectionFactory,设置连接信息,再通过ConnectionFactory获取Connection。
(2)通过Connection获取Channel。
(3)根据需要,调用Channel的queueDeclare()方法声明队列,如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。
(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,
该参数相当于JMS中的消息监听器。
这个 basicConsume()方法 相当于是异步消费。
而同步消费会出现阻塞情况,这就失去消息中间件存在的意义,所以先讲异步消费。
★ 发送消息
使用RabbitMQ Java Client依赖库开发消息生产者的大致步骤如下:
(1)创建ConnectionFactory,设置连接信息,再通过ConnectionFactory获取Connection。
(2)通过Connection获取Channel。
(3)根据需要调用exchangeDeclare()、queueDeclare()方法声明Exchange和队列、并完成队列与Exchange的绑定。
如果声明的Exchange还不存在,则创建该Exchange;否则直接使用已有的Exchange。
Declare:声明、宣布
(4)调用Channel的basicPublish()方法发送消息,调用该方法的第一个参数是exchange,
第二个参数为路由key,最后两个参数依次是消息属性和消息数据体。
【注意】:虽然消息生产者与队列是完全隔离的, 但如果消息生产者不声明消息队列,那系统中就可能暂时还没有任何消息队列。
在这种情况下,消息生产者向Exchange发送的消息将不会分发给任何队列,这些消息直接就被丢弃了。
【备注】:为了保证消息生产者能将消息发送到指定队列,消息生产者需要声明消息队列,保证消息队列的存在。
**问题:**消息生产者 和 消息队列 是完全隔离的,但是生产者为什么还要声明消息队列?
**原因:**因为程序如果先运行消息生产者,后运行消费者,而声明消息队列的方法又只存在消费者那边,那么在先运行消息生产者时,就会因为还没有生成消息队列,所以生产者发送到exchange的消息,会因为没有对应的消息队列而被丢弃。
代码演示:
先创建一个普通的 maven 项目。
添加一些属性 和 RabbitMQ的依赖
创建消息消费者
把创建连接的代码封装到一个方法里去。
消费者的代码
注意:channel.basicConsume 的第二个参数 autoAck:true,就是表示自动确认消息已经被消费完成了。就是当消费者接收到消息之后,就立马返回一个已经确认消费的消息回去给消息队列。
这样容易出现问题,就是消费者这边因为一收到消息就会自动确认消息被消费了并返回已经消费消息的结果回去给消息队列,但是可能消费者其实还没有把消息消费掉,而消息队列那边又以为消费者已经把消息消费了,所以就继续发消息给那个消费者。
而消费者一收到消息又自动确认消费并返回,就会导致这个消息队列的消息越来越多,然后消费者消费不完。
这个演示已消费未确认的演示放最后
执行消费者
控制台查看
原本没有这个消息队列,通过调用Channel的queueDeclare()方法声明队列,如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列
一开始消费者声明的这个消息队列,这个是否独占的exclusive 参数我是写true,
所以下图的 myQueue01的 Features 就是 Excl
这个就是创建的消费者。
用于消费这个 myQueue01 消息队列的消费者。
后面把 exclusive 改成了 false,是因为后面的生产者,需要也声明这个 myQueue01 消息队列,而如果这个消息队列是 独占的,就没法声明了,所以改成 false
创建消息生产者
生产者发送完消息就会关闭资源
消费者则是一直启动着
测试
先启动消费者或者启动生产者都一样,因为生产者和消费者都有调用queueDeclare() 方法声明消息队列,所以不存在发送消息后没找到对应的消息队列而导致消息被丢弃的情况。
启动消费者
然后启动生产者
生产者发送消息
再看消费者,已经消费了一条消息了。
因为先启动消费者,所以生产者发送的消息马上被消费了,在控制台的队列就看不到了。
再测试:
先启动生产者
关闭消费者,然后启动生产者发送消息
可以看出消息已经生产发送到消息队列了
这一步的流程图
启动消费者消费消息
流程图:
已消费未确认
注意:channel.basicConsume 的第二个参数 autoAck:true,就是表示自动确认消息已经被消费完成了。就是当消费者接收到消息之后,就立马返回一个已经确认消费的消息回去给消息队列。
这样容易出现问题,就是消费者这边因为一收到消息就会自动确认消息被消费了并返回已经消费消息的结果回去给消息队列,但是可能消费者其实还没有把消息消费掉,而消息队列那边又以为消费者已经把消息消费了,所以就继续发消息给那个消费者。
而消费者一收到消息又自动确认消费并返回,就会导致这个消息队列的消息越来越多,然后消费者消费不完。
如图:因为 autoAck 为false , 所以消费者消费消息后没有进行确认。这里的 unacked 条数就为1.
如果改成 autoAck 为false ,那么消费者消费消息的代码,要加上确认消息的方法。
这个就是手动确认消息。
完整代码:
ConnectionUtil 连接工具类
package cn.ljh.app.rabbitmq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//连接工具
public class ConnectionUtil
{//获取连接的方法public static Connection getConnection() throws IOException, TimeoutException{//创建连接工厂----这个ConnectionFactory源码可以看出有构造器,所以直接new一个出来ConnectionFactory connectionFactory = new ConnectionFactory();//设置连接信息connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("ljh");connectionFactory.setPassword("123456");connectionFactory.setVirtualHost("/"); //连接虚拟主机//从连接工厂获取连接Connection connection = connectionFactory.newConnection();//返回连接return connection;}
}
P2PProducer 生产者
package cn.ljh.app.rabbitmq.producer;import cn.ljh.app.rabbitmq.consumer.P2PConsumer;
import cn.ljh.app.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;//消息生产者--使用默认的exchange
public class P2PProducer
{//(1)创建ConnectionFactory,设置连接信息,再通过ConnectionFactory获取Connection。//(2)通过Connection获取Channel。//(3)根据需要调用exchangeDeclare()、queueDeclare()方法声明Exchange和队列、并完成队列与Exchange的绑定。// 如果声明的Exchange还不存在,则创建该Exchange;否则直接使用已有的Exchange。//(4)调用Channel的basicPublish()方法发送消息,调用该方法的第一个参数是exchange,// 第二个参数为路由key,最后两个参数依次是消息属性和消息数据体。public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel。Channel channel = conn.createChannel();//3、调用exchangeDeclare()方法声明Exchange、调用queueDeclare()方法声明队列,并完成队列与Exchange的绑定//此处打算直接使用默认的Exchange来分发消息,因此无需声明 Exchange,只需声明队列channel.queueDeclare(P2PConsumer.QUEUE_NAME, true, false, false, null);String message = "生产者发送的消息的内容";//4、调用Channel的basicPublish()方法发送消息channel.basicPublish(""/*默认的 Exchange 没有名字,所以用空的字符串*/,P2PConsumer.QUEUE_NAME/*使用队列名作为路由key,表明该消息将会被路由到该队列*/,null /*指定额外的消息的属性*/,message.getBytes(StandardCharsets.UTF_8)/*消息体必须是字节数组类型-->byte[]*/);//5、关闭资源//关闭通道channel.close();//关闭连接conn.close();}
}
P2PConsumer 消费者
package cn.ljh.app.rabbitmq.consumer;import cn.ljh.app.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消息消费者
public class P2PConsumer
{// 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下://(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。//(2)通过Connection获取Channel。//(3)根据需要、调用Channel的queueDeclare()方法声明队列, Declare:声明、宣布// 如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。//(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。//常量public final static String QUEUE_NAME = "myQueue01";public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel = conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列//如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列//参数1:声明的队列名; 参数2:消息队列是否持久化//参数3:是否只允许该消息消费者消费该队列的消息,为true,则其他消费者在这个myQueue01队列消息积堆过多的情况下,也无法帮忙消费。//参数4:是否自动删除(如果为true,在该队列没消息的情况下,会自动删除该队列) 参数5:填写额外的参数channel.queueDeclare(QUEUE_NAME, true, false, false, null);//4、调用Channel 的 basicConsume()方法开始处理消费消息channel.basicConsume(QUEUE_NAME/*消费这个名字的消费队列里面的消息*/,true/*消息的确认模式:是否自动确认*/,new DefaultConsumer(channel){//处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来String message = new String(body, "UTF-8");//printf:格式化输出函数 %s:输出字符串 %n:换行System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(),envelope.getRoutingKey(),message);}});}}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ljh</groupId><artifactId>rabbitmqtest</artifactId><version>1.0.0</version><name>rabbitmqtest</name><!-- 属性 --><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>11</java.version></properties><!-- 依赖 --><dependencies><!-- RabbitMQ 的依赖库 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.13.0</version></dependency></dependencies></project>
相关文章:

200、使用默认 Exchange 实现 P2P 消息 之 消息生产者(发送消息) 和 消息消费者(消费消息)
RabbitMQ 工作机制图: Connection: 代表客户端(包括消息生产者和消费者)与RabbitMQ之间的连接。 Channel: 连接内部的Channel。channel:通道 Exchange: 充当消息交换机的组件。 Queueÿ…...
SqlServer--get 和 post 请求 http接口
1. 开启 不开启报错 如下 4.1 SQL Server blocked access to procedure ‘sys.sp_OACreate’ sp_configure show advanced options, 1;GORECONFIGURE;GOsp_configure Ole Automation Procedures, 1;GORECONFIGURE;GO2. post Declare ServiceUrl nvarchar(MAX) Declare req_…...

利用人工智能提升企业培训的个性化体验
随着科技的不断进步,人工智能(AI)正逐渐渗透到各个领域。而在企业培训领域,人工智能也展现出了巨大的潜力。利用人工智能技术的企业培训系统,企业可以为员工提供个性化、高效的培训体验,进一步提升他们的专…...

基于JavaWeb的图书售卖网站(源码+部署+LW)
项目描述 临近学期结束,还是毕业设计,你还在做java程序网络编程,期末作业,老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。今天给大家介绍一篇基于JavaWeb的图书售卖网…...
Java设计模式之代理模式
代理模式是一种结构型设计模式,它允许通过创建一个代理对象来控制对另一个对象的访问。代理模式在软件开发中经常被使用,它可以提供额外的功能,例如远程访问、延迟加载、访问控制和日志记录等。 代理模式涉及三个主要角色: 抽象…...
Oracle数据泵导入和导出命令
–管理员方式登录,新建表空间和用户,并建立文件夹映射路径并授权 CREATE DIRECTORY directory_name AS ‘path_to_directory’; grant read,write on directory directory to backup ** —EXPDP多线程备份数据库脚本–dblink–可以修改为命令行 echo …...
Linux中所有环境变量配置文件及用途
在Linux系统中,有多个文件可以用来配置环境变量,这些文件位于不同的目录和层级,并可以用于不同的目的。以下是一些常见的环境变量配置文件: 系统级环境变量文件: /etc/environment:这个文件包含了系统范围的…...

一文读懂flutter线程: 深入了解Flutter中的多线程编程
深入了解Flutter中的多线程编程 前言一、为什么需要多线程?二、在Flutter中创建线程三、多线程的最佳实践四、Flutter中的多线程示例五、Flutter中的多线程错误处理六、Flutter中的多线程性能优化七、安全性和隐私考虑八、跨平台性考虑 总结 前言 在移动应用开发领域…...

如何限制word文件中部分内容无法编辑
工作中我们经常会用到Word制作一些文件,文件中有一部分内容不想他人编辑,我们可以设置限制编辑,可以对一部分内容设置限制编辑,具体方法如下: 我们将需要将可以编辑的地方选中,然后打开限制编辑功能 然后勾…...

免疫球蛋白介绍
免疫球蛋白(Immunoglobulin,Ig)是广泛存在于哺乳动物血清、淋巴液、组织液和外分泌液中的一种具有抗体活性或化学结构与抗体相似的球蛋白,在机体防御疾病的重要成分在疾病研究、药物研发、疫苗评价中具有重要作用。抗体࿰…...

VMWare 安装CentOS7镜像
安装CentOS 7 整个安装过程分两大步,第一步装机器,第二步装系统. 第一步: 装机器 检查物理机虚拟化支持是否开启,需要进入到BIOS中设置,因各种电脑型号进入BIOS 方式不同,同学们自行查找对应品牌电脑如何进入BIOS 建…...

什么台灯最好学生晚上用?开学适合孩子学习的台灯
作为学龄期儿童的家长,最担心的就是孩子长时间学习影响视力健康。无论是上网课、写作业、玩桌游还是陪伴孩子读绘本,都需要一个足够明亮的照明环境,因此选购一款为孩子视力发展保驾护航的台灯非常重要。推荐五款适合孩子学习的台灯。 1. 书客…...
6.1 C/C++ 封装字符串操作
C/C语言是一种通用的编程语言,具有高效、灵活和可移植等特点。C语言主要用于系统编程,如操作系统、编译器、数据库等;C语言是C语言的扩展,增加了面向对象编程的特性,适用于大型软件系统、图形用户界面、嵌入式系统等。…...

小白网络安全学习手册
作为一个合格的网络安全工程师,应该做到攻守兼备,毕竟知己知彼,才能百战百胜。 谈起黑客,可能各位都会想到:盗号,其实不尽然;黑客是一群喜爱研究技术的群体,在黑客圈中,一…...

思科拟推出PuzzleFS驱动,采用Rust语言开发
据了解,PuzzleFS宣称是“下一代 Linux 容器文件系统”,并使用Rust语言编写,具有“快速镜像构建”、“直接挂载支持”、“内存安全保证”等功能mroeoyw。 Multiable万达宝制造ERP(www.multiable.com.cn/solutions_zz)支持自定义栏位,并智能制…...

为什么要学习python
Python 越来越火爆 Python 在诞生之初,因为其功能不好,运转功率低,不支持多核,根本没有并发性可言,在计算功能不那么好的年代,一直没有火爆起来,甚至很多人根本不知道有这门语言。 随着时代的…...

相机噪声评估
当拥有一个相机,并且写了一个降噪的算法,想要测试降噪的应用效果。 相机在光线不足的情况下产生噪点的原因主要与以下几个因素有关: 感光元件的工作原理:相机的图像传感器是由数百万甚至数千万的感光元件(如CMOS或CC…...

CRM系统:快速实现外勤出差人员远程访问企业提升工作效率!
🎬 鸽芷咕:个人主页 🔥 个人专栏:《速学数据结构》 《C语言进阶篇》 ⛺️生活的理想,就是为了理想的生活! 文章目录 快速实现外勤出差人员远程访问企业CRM系统前言1. 无需公网IP,高效低成本实现CRM系统远程访问1.1 下…...

028.Python面向对象_类补充_元类
我 的 个 人 主 页:👉👉 失心疯的个人主页 👈👈 入 门 教 程 推 荐 :👉👉 Python零基础入门教程合集 👈👈 虚 拟 环 境 搭 建 :👉&…...
cocos2d-x Android原生平台与Lua交互
版本: cocos2d-x 语言: C/Java/Lua 简介 cocos2d-x原生平台Android 接入第三方SDK, 需要了解LuaJavaBridge的使用。 它封装了用于Java和Lua的相互调用, 其调用通过C为中介,简要的流程: Lua调用Java: Lua -> C -> Java J…...

Linux应用开发之网络套接字编程(实例篇)
服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …...
conda相比python好处
Conda 作为 Python 的环境和包管理工具,相比原生 Python 生态(如 pip 虚拟环境)有许多独特优势,尤其在多项目管理、依赖处理和跨平台兼容性等方面表现更优。以下是 Conda 的核心好处: 一、一站式环境管理:…...

Unity3D中Gfx.WaitForPresent优化方案
前言 在Unity中,Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染(即CPU被阻塞),这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案: 对惹,这里有一个游戏开发交流小组&…...
ssc377d修改flash分区大小
1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...

HTML 列表、表格、表单
1 列表标签 作用:布局内容排列整齐的区域 列表分类:无序列表、有序列表、定义列表。 例如: 1.1 无序列表 标签:ul 嵌套 li,ul是无序列表,li是列表条目。 注意事项: ul 标签里面只能包裹 li…...

【项目实战】通过多模态+LangGraph实现PPT生成助手
PPT自动生成系统 基于LangGraph的PPT自动生成系统,可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析:自动解析Markdown文档结构PPT模板分析:分析PPT模板的布局和风格智能布局决策:匹配内容与合适的PPT布局自动…...

《通信之道——从微积分到 5G》读书总结
第1章 绪 论 1.1 这是一本什么样的书 通信技术,说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号(调制) 把信息从信号中抽取出来&am…...

C++ 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...

全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...

selenium学习实战【Python爬虫】
selenium学习实战【Python爬虫】 文章目录 selenium学习实战【Python爬虫】一、声明二、学习目标三、安装依赖3.1 安装selenium库3.2 安装浏览器驱动3.2.1 查看Edge版本3.2.2 驱动安装 四、代码讲解4.1 配置浏览器4.2 加载更多4.3 寻找内容4.4 完整代码 五、报告文件爬取5.1 提…...