Rabbitmq的几种工作模式
工具类
public class RabbitMQConnection {public static Connection getConnection() throws Exception{//1.创建connectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();//2.配置HostconnectionFactory.setHost("127.0.0.1");//3.设置PortconnectionFactory.setPort(5672);//4.设置账户和密码connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//5.设置VirtualHostconnectionFactory.setVirtualHost("0517");return connectionFactory.newConnection();}
}
点对点(简单)的队列
图解:
生产者代码
public class Producer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setUsername("guest");factory.setPassword("guest");//channel 实现了自动 close 接口 自动关闭 不需要显示关闭try(Connection connection = factory.newConnection(); Channel channel =connection.createChannel()) {/*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message="hello world";/*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送完毕");}} }
消费者代码
public class Consumer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();System.out.println("等待接收消息....");//推送的消息如何进行消费的接口回调DeliverCallback deliverCallback=(consumerTag, delivery)->{String message= new String(delivery.getBody());System.out.println(message);};//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallback cancelCallback=(consumerTag)->{System.out.println("消息消费被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答* 3.消费者未成功消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} }
重点解析:点对点没什么可说的,就是生产者产生消息给到消息队列,第一次已推送的形式给到消费者,之后就是消费者端主动的拉取,需要在生产端创建好队列或在图形化页面创建好队列
工作(公平性)队列模式
图解:
生产者代码
public class Task01 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {try(Channel channel=RabbitMqUtils.getChannel();) {channel.queueDeclare(QUEUE_NAME,false,false,false,null);//从控制台当中接受信息Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String message = scanner.next();channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("发送消息完成:"+message);}}} }
消费者代码
消费者1:
public class Consumer1 {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//推送的消息如何进行消费的接口回调DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");};System.out.println("C1 消费者启动等待消费......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} }
消费者2:
public class Consumer2 {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//推送的消息如何进行消费的接口回调DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");};System.out.println("C2 消费者启动等待消费......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} }
重点解析:
工作(公平性)队列模式,和点对点差不多,就是生产者将消息直接存放到队列中,然后队列默认采用轮询的形式选择消费者进行消费
当然也可以设置channel.basicQos(i)的形式进行公平分发(谁处理快,谁做的多)
这里公平的意思是谁做的多,谁处理的多,并不是平均分配的意思
发布订阅模式
图解:
生产者代码
public class ProducerFanout {//定义交换机名称private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception{//创建连接Connection connection = RabbitMQConnection.getConnection();//创建通道Channel channel = connection.createChannel();//通道关联交换机(创建交换机)(fanout类型会自动创建)//channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);//channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);String msg = "程子强你好";channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());channel.close();connection.close();} }
消费者代码
消费者1:
public class MailConsumer {/*** 定义邮件队列*/private static final String QUEUE_NAME = "fanout_email_queue";/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception{System.out.println("邮件消费者...");// 创建我们的连接Connection connection = RabbitMQConnection.getConnection();// 创建我们通道final Channel channel = connection.createChannel();// 关联队列消费者关联队列channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("邮件消费者获取消息:" + msg);}};// 开始监听消息 自动签收channel.basicConsume(QUEUE_NAME, true, defaultConsumer);} }
消费者2:
public class SmsConsumer {/*** 定义短信队列*/private static final String QUEUE_NAME = "fanout_email_sms";/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception{System.out.println("短信消费者...");// 创建我们的连接Connection connection = RabbitMQConnection.getConnection();// 创建我们通道final Channel channel = connection.createChannel();// 关联队列消费者关联队列channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("短信消费者获取消息:" + msg);}};// 开始监听消息 自动签收channel.basicConsume(QUEUE_NAME, true, defaultConsumer);} }
重点解析:
发布订阅模式,和前两种模式不同这里用到了一个fanout类型的交换机(具体交换机的类型和概念小伙伴们可以自行查阅下,这里主要讲工作模式),生产者将消息发送给这个交换机,这个交换机把消息发送给每一个和其绑定的队列(注意fanout类型的交换机不需要key所以生产者传递直接传""就好)
路由模式Routing
图解:
生产者代码
public class ReceiveLogsDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception {try (Connection connection = RabbitMQConnection.getConnection(); Channel channel =connection.createChannel()) {//创建交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);//创建多个 bindingKeyMap<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("info","普通 info 信息");bindingKeyMap.put("warning","警告 warning 信息");bindingKeyMap.put("error","错误 error 信息");//debug 没有消费这接收这个消息 所有就丢失了bindingKeyMap.put("debug","调试 debug 信息");for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){String bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,bindingKey, null,message.getBytes("UTF-8"));System.out.println("生产者发出消息:" + message);}}} }
消费者代码
消费者1:
public class ReceiveLogsDirect01 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception{Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();//写不写都可以,如果代码创建在生产端写,如果是浏览器创建,就不需要写这段代码channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);String queueName = "disk";channel.queueBind(queueName, EXCHANGE_NAME, "error");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");message="接收绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message;File file = new File("E:\\xxx\\rabbitmq_info.txt");//路径任意写FileUtils.writeStringToFile(file,message,"UTF-8");System.out.println("错误日志已经接收");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});} }
消费者2:
public class ReceiveLogsDirect02 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);String queueName = "console";channel.queueBind(queueName, EXCHANGE_NAME, "info");channel.queueBind(queueName, EXCHANGE_NAME, "warning");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 接收绑定键 :"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});} }
重点解析:
路由模式,用到了direct类型的交换机,简单讲就是队列通过key和交换机进行绑定,生产者那边传入的key和消息给交换机,如果该队列绑定的key与其传入的key相同则,交换机讲该消息传给对应的队列,一个队列可以绑定多个key
通配符模式Topics(主题)
图解:
生产者代码
public class ProducerTopic {/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws Exception{// 创建ConnectionConnection connection = RabbitMQConnection.getConnection();// 创建ChannelChannel channel = connection.createChannel();// 通道关联交换机channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);String msg = "我是xxx";channel.basicPublish(EXCHANGE_NAME, "czq.hhh.aa", null, msg.getBytes());channel.close();connection.close();} }
消费者代码
消费者1:
public class SmsConsumer {/*** 定义短信队列*/private static final String QUEUE_NAME = "topic_sms_queue";/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws Exception{System.out.println("短信消费者...");// 创建ConnectionConnection connection = RabbitMQConnection.getConnection();// 创建ChannelChannel channel = connection.createChannel();// 关联队列消费者关联队列channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "czq.#");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("短信消费者获取消息:" + msg);}};// 开始监听消息 自动签收channel.basicConsume(QUEUE_NAME, true, defaultConsumer);} }
消费者2:
public class MailConsumer {/*** 定义邮件队列*/private static final String QUEUE_NAME = "topic_email_queue";/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws Exception{System.out.println("邮件消费者...");// 创建ConnectionConnection connection = RabbitMQConnection.getConnection();// 创建ChannelChannel channel = connection.createChannel();// 关联队列消费者关联队列channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.boyatop.#");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("邮件消费者获取消息:" + msg);}};// 开始监听消息 自动签收channel.basicConsume(QUEUE_NAME, true, defaultConsumer);} }
重点解析:
通配符模式,用到了topic类型的交换机,简单讲与通配符模式原理大致,差别在于根据队列绑定的路由建模糊转发到具体的队列中存放。其中#号表示支持匹配多个词;*号表示只能匹配一个词,假如同一个队列与交换机直接设置的多个模糊的key都符合传入的,那么也只传送一次
相关文章:

Rabbitmq的几种工作模式
工具类 public class RabbitMQConnection {public static Connection getConnection() throws Exception{//1.创建connectionFactoryConnectionFactory connectionFactory new ConnectionFactory();//2.配置HostconnectionFactory.setHost("127.0.0.1");//3.设置Po…...

如何在 Debian 上安装运行极狐GitLab Runner?【二】
极狐GitLab 是 GitLab 在中国的发行版,专门面向中国程序员和企业提供企业级一体化 DevOps 平台,用来帮助用户实现需求管理、源代码托管、CI/CD、安全合规,而且所有的操作都是在一个平台上进行,省事省心省钱。可以一键安装极狐GitL…...

简单的docker学习 第13章 CI/CD与Jenkins(下)
第13章 CI/CD 与 Jenkins 13.13 自由风格的 CI 操作(最终架构) 前面的架构存在的问题是,若有多个目标服务器都需要使用该镜像,那么每个目标服务器都需要在本地构建镜像,形成系统资源浪费。若能够在 Jenkins 中将镜像相撞构建好并推送到 Har…...
基于STM32设计的智能鱼缸_带鱼儿数量视觉识别(华为云IOT)(202)
文章目录 一、前言1.1 项目介绍【1】项目功能介绍【2】设计实现的功能【3】项目硬件模块组成1.2 设计思路【1】整体设计思路【2】ESP8266工作模式配置【3】自动换水原理1.3 项目开发背景【1】选题的意义【2】可行性分析【3】参考文献1.4 开发工具的选择【1】设备端开发【2】上位…...

立体连接模式下的传播与沟通:AI智能名片小程序的创新应用与深度剖析
摘要:在数字化浪潮的推动下,信息传播与沟通方式正经历着前所未有的变革。立体连接模式,作为这一变革的重要产物,通过整合物理空间、虚拟网络空间与社群心理空间的三维联动,实现了信息的深度传播与高效互动。AI智能名片…...

基于Python的Scrapy爬虫的个性化书籍推荐系统【Django框架、超详细系统设计原型】
文章目录 有需要本项目的代码或文档以及全部资源,或者部署调试可以私信博主项目介绍系统分析系统设计展示总结 有需要本项目的代码或文档以及全部资源,或者部署调试可以私信博主 项目介绍 近年来,随着互联网的蓬勃发展,企事业单…...
二叉树bst
二叉搜索树的中序遍历结果有序 ,二叉搜索树性质,左小右大,二叉搜索树中序遍历的结果应该是从小到大的。 题目描述二叉树是从上到下,从左到右描述,并非前中后序中的一种。 99. 恢复二叉搜索树 class Solution:first …...

elasticsearch的使用(二)
DSL查询 Elasticsearch的查询可以分为两大类: 叶子查询(Leaf query clauses):一般是在特定的字段里查询特定值,属于简单查询,很少单独使用。 复合查询(Compound query clauses)&am…...

YOLOv8由pt文件中读取模型信息
Pytorch的pt模型文件中保存了许多模型信息,如模型结构、模型参数、任务类型、批次、数据集等 在先前的YOLOv8实验中,博主发现YOLOv8在预测时并不需要指定任务类型,因为这些信息便保存在pt模型中,那么,今天我们便来看看…...
js遍历效率
1w条数据,遍历效率 1、for 15s let t(new Date()).getTime()let a[]for(var i 0; i < 100000; i){a.push({id:i,val:i})}let ts[]for(var i 0; i < a.length; i){if(a[i].val!2 && a[i].val!4 && a[i].val!8){ts.push(a[i])}}let c(new D…...

QModbus例程分析
由于有一个Modebus上位机的需要,分析一下QModbus Slave的源代码,方便后面的开发。 什么是Modbus Modbus是一种常用的串行通信协议,被广泛应用于工业自动化领域。它最初由Modicon(目前属于施耐德电气公司)于1979年开发…...

Vue万字学习笔记(入门1)
目录 简介 Vue是什么 渐进式框架 单文件组件 API 风格 选项式 API (Options API) 组合式 API (Composition API) 创建一个 Vue 应用 挂载应用 DOM 中的根组件模板 应用配置 多个应用实例 模板语法 文本插值 原始 HTML Attribute 绑定 简写…...

Cesium手动建模模型用Cesiumlab转3D Tiles模型位置不对,调整模型位置至指定经纬度
Cesium加载3Dtiles模型的平移和旋转_3dtiles先旋转再平移示例-CSDN博客 Cesium 平移cesiumlab生产的3Dtiles切片模型到目标经纬度-CSDN博客 【ArcGISCityEngine】自行制作Lod1城市大尺度白膜数据_cityengine 生成指定坐标集指定区域的白模-CSDN博客 以上次ArcGISCityEngine制…...

学习C语言第23天(程序环境和预处理)
1. 程序的翻译环境和执行环境 在ANSIC的任何一种实现中,存在两个不同的环境 第1种是翻译环境,在这个环境中源代码被转换为可执行的机器指令。 第2种是执行环境,它用于实际执行代码。 2. 详解编译链接 2.1 翻译环境 每个源文件单独经过编…...

Ubuntu22.04安装
使用Vmware安装好后 首先执行下面命令,不然每次打开终端会出现To run a command as administrator (user root)… touch ~/.sudo_as_admin_successful换源 参考 sudo cp /etc/apt/sources.list /etc/apt/sources.list.baksudo gedit /etc/apt/sources.list清空…...

从入门到自动化:一篇文章掌握Python的80%
Python作为一种高级编程语言,以其简洁明了的语法和强大的功能性,在全球编程社区内享有极高的声誉。本文将带领你从Python的基础语法入手,介绍其常用库的应用,以及如何将Python用于数据分析、网络爬虫和简单的自动化任务࿰…...
开源的主流机器学习框架
主流的开源机器学习框架包括: 1. TensorFlow:由Google开发和维护的深度学习框架,广泛用于生产环境和研究。支持多种平台,并具有丰富的工具和库支持。 2. PyTorch:由Facebook开发的深度学习框架,以其动态计…...
RabbitMQ:发送者的可靠性之配置发送者重试机制
文章目录 为什么需要重试机制?如何配置重试机制?测试重试机制使用重试机制的注意事项 在使用消息队列(MQ)系统时,网络故障是不可避免的问题,尤其是在与RabbitMQ等服务交互时。如果生产者在发送消息时遇到网…...

基于深度学习的大规模MIMO信道状态信息反馈
MIMO系统 MIMO系统利用多个天线在发送端和接收端之间建立多条独立的信道,从而使得同一时间可以传输多个数据流,从而使得同一之间可以传输多个数据流,提高数据传输速率。 优势 增加传输速率和容量,提高信号覆盖范围和抗干扰能力…...

在Docker中部署Rasa NLU服务
最近因为项目需要将rasa nlu配置到docker容器中供系统调用,本篇主要整理该服务的docker配置过程。 本篇的重点在于docker的使用,不在Rasa NLU。 系统环境:Ubuntu 18.04.6 1. Rasa介绍 Rasa是一个开源的机器学习框架,专为构建基于文…...

eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)
说明: 想象一下,你正在用eNSP搭建一个虚拟的网络世界,里面有虚拟的路由器、交换机、电脑(PC)等等。这些设备都在你的电脑里面“运行”,它们之间可以互相通信,就像一个封闭的小王国。 但是&#…...

XCTF-web-easyupload
试了试php,php7,pht,phtml等,都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接,得到flag...
三维GIS开发cesium智慧地铁教程(5)Cesium相机控制
一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点: 路径验证:确保相对路径.…...
FastAPI 教程:从入门到实践
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建 API,支持 Python 3.6。它基于标准 Python 类型提示,易于学习且功能强大。以下是一个完整的 FastAPI 入门教程,涵盖从环境搭建到创建并运行一个简单的…...
JVM垃圾回收机制全解析
Java虚拟机(JVM)中的垃圾收集器(Garbage Collector,简称GC)是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象,从而释放内存空间,避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...

SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现
摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序,以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务,提供稳定高效的数据处理与业务逻辑支持;利用 uniapp 实现跨平台前…...
Unit 1 深度强化学习简介
Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库,例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体,比如 SnowballFight、Huggy the Do…...
【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分
一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计,提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合:各模块职责清晰,便于独立开发…...

保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek
文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama(有网络的电脑)2.2.3 安装Ollama(无网络的电脑)2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...

C/C++ 中附加包含目录、附加库目录与附加依赖项详解
在 C/C 编程的编译和链接过程中,附加包含目录、附加库目录和附加依赖项是三个至关重要的设置,它们相互配合,确保程序能够正确引用外部资源并顺利构建。虽然在学习过程中,这些概念容易让人混淆,但深入理解它们的作用和联…...