Rabbitmq的几种工作模式
工具类
public class RabbitMQConnection {public static Connection getConnection() throws Exception{//1.创建connectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();//2.配置HostconnectionFactory.setHost("127.0.0.1");//3.设置PortconnectionFactory.setPort(5672);//4.设置账户和密码connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//5.设置VirtualHostconnectionFactory.setVirtualHost("0517");return connectionFactory.newConnection();}
}
点对点(简单)的队列
图解:
生产者代码
public class Producer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setUsername("guest");factory.setPassword("guest");//channel 实现了自动 close 接口 自动关闭 不需要显示关闭try(Connection connection = factory.newConnection(); Channel channel =connection.createChannel()) {/*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message="hello world";/*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送完毕");}} }
消费者代码
public class Consumer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();System.out.println("等待接收消息....");//推送的消息如何进行消费的接口回调DeliverCallback deliverCallback=(consumerTag, delivery)->{String message= new String(delivery.getBody());System.out.println(message);};//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallback cancelCallback=(consumerTag)->{System.out.println("消息消费被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答* 3.消费者未成功消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} }
重点解析:点对点没什么可说的,就是生产者产生消息给到消息队列,第一次已推送的形式给到消费者,之后就是消费者端主动的拉取,需要在生产端创建好队列或在图形化页面创建好队列
工作(公平性)队列模式
图解:
生产者代码
public class Task01 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {try(Channel channel=RabbitMqUtils.getChannel();) {channel.queueDeclare(QUEUE_NAME,false,false,false,null);//从控制台当中接受信息Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String message = scanner.next();channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("发送消息完成:"+message);}}} }
消费者代码
消费者1:
public class Consumer1 {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//推送的消息如何进行消费的接口回调DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");};System.out.println("C1 消费者启动等待消费......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} }
消费者2:
public class Consumer2 {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//推送的消息如何进行消费的接口回调DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");};System.out.println("C2 消费者启动等待消费......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} }
重点解析:
工作(公平性)队列模式,和点对点差不多,就是生产者将消息直接存放到队列中,然后队列默认采用轮询的形式选择消费者进行消费
当然也可以设置channel.basicQos(i)的形式进行公平分发(谁处理快,谁做的多)
这里公平的意思是谁做的多,谁处理的多,并不是平均分配的意思
发布订阅模式
图解:
生产者代码
public class ProducerFanout {//定义交换机名称private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception{//创建连接Connection connection = RabbitMQConnection.getConnection();//创建通道Channel channel = connection.createChannel();//通道关联交换机(创建交换机)(fanout类型会自动创建)//channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);//channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);String msg = "程子强你好";channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());channel.close();connection.close();} }
消费者代码
消费者1:
public class MailConsumer {/*** 定义邮件队列*/private static final String QUEUE_NAME = "fanout_email_queue";/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception{System.out.println("邮件消费者...");// 创建我们的连接Connection connection = RabbitMQConnection.getConnection();// 创建我们通道final Channel channel = connection.createChannel();// 关联队列消费者关联队列channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("邮件消费者获取消息:" + msg);}};// 开始监听消息 自动签收channel.basicConsume(QUEUE_NAME, true, defaultConsumer);} }
消费者2:
public class SmsConsumer {/*** 定义短信队列*/private static final String QUEUE_NAME = "fanout_email_sms";/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception{System.out.println("短信消费者...");// 创建我们的连接Connection connection = RabbitMQConnection.getConnection();// 创建我们通道final Channel channel = connection.createChannel();// 关联队列消费者关联队列channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("短信消费者获取消息:" + msg);}};// 开始监听消息 自动签收channel.basicConsume(QUEUE_NAME, true, defaultConsumer);} }
重点解析:
发布订阅模式,和前两种模式不同这里用到了一个fanout类型的交换机(具体交换机的类型和概念小伙伴们可以自行查阅下,这里主要讲工作模式),生产者将消息发送给这个交换机,这个交换机把消息发送给每一个和其绑定的队列(注意fanout类型的交换机不需要key所以生产者传递直接传""就好)
路由模式Routing
图解:
生产者代码
public class ReceiveLogsDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception {try (Connection connection = RabbitMQConnection.getConnection(); Channel channel =connection.createChannel()) {//创建交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);//创建多个 bindingKeyMap<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("info","普通 info 信息");bindingKeyMap.put("warning","警告 warning 信息");bindingKeyMap.put("error","错误 error 信息");//debug 没有消费这接收这个消息 所有就丢失了bindingKeyMap.put("debug","调试 debug 信息");for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){String bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,bindingKey, null,message.getBytes("UTF-8"));System.out.println("生产者发出消息:" + message);}}} }
消费者代码
消费者1:
public class ReceiveLogsDirect01 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception{Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();//写不写都可以,如果代码创建在生产端写,如果是浏览器创建,就不需要写这段代码channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);String queueName = "disk";channel.queueBind(queueName, EXCHANGE_NAME, "error");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");message="接收绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message;File file = new File("E:\\xxx\\rabbitmq_info.txt");//路径任意写FileUtils.writeStringToFile(file,message,"UTF-8");System.out.println("错误日志已经接收");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});} }
消费者2:
public class ReceiveLogsDirect02 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);String queueName = "console";channel.queueBind(queueName, EXCHANGE_NAME, "info");channel.queueBind(queueName, EXCHANGE_NAME, "warning");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 接收绑定键 :"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});} }
重点解析:
路由模式,用到了direct类型的交换机,简单讲就是队列通过key和交换机进行绑定,生产者那边传入的key和消息给交换机,如果该队列绑定的key与其传入的key相同则,交换机讲该消息传给对应的队列,一个队列可以绑定多个key
通配符模式Topics(主题)
图解:
生产者代码
public class ProducerTopic {/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws Exception{// 创建ConnectionConnection connection = RabbitMQConnection.getConnection();// 创建ChannelChannel channel = connection.createChannel();// 通道关联交换机channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);String msg = "我是xxx";channel.basicPublish(EXCHANGE_NAME, "czq.hhh.aa", null, msg.getBytes());channel.close();connection.close();} }
消费者代码
消费者1:
public class SmsConsumer {/*** 定义短信队列*/private static final String QUEUE_NAME = "topic_sms_queue";/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws Exception{System.out.println("短信消费者...");// 创建ConnectionConnection connection = RabbitMQConnection.getConnection();// 创建ChannelChannel channel = connection.createChannel();// 关联队列消费者关联队列channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "czq.#");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("短信消费者获取消息:" + msg);}};// 开始监听消息 自动签收channel.basicConsume(QUEUE_NAME, true, defaultConsumer);} }
消费者2:
public class MailConsumer {/*** 定义邮件队列*/private static final String QUEUE_NAME = "topic_email_queue";/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws Exception{System.out.println("邮件消费者...");// 创建ConnectionConnection connection = RabbitMQConnection.getConnection();// 创建ChannelChannel channel = connection.createChannel();// 关联队列消费者关联队列channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.boyatop.#");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("邮件消费者获取消息:" + msg);}};// 开始监听消息 自动签收channel.basicConsume(QUEUE_NAME, true, defaultConsumer);} }
重点解析:
通配符模式,用到了topic类型的交换机,简单讲与通配符模式原理大致,差别在于根据队列绑定的路由建模糊转发到具体的队列中存放。其中#号表示支持匹配多个词;*号表示只能匹配一个词,假如同一个队列与交换机直接设置的多个模糊的key都符合传入的,那么也只传送一次
相关文章:

Rabbitmq的几种工作模式
工具类 public class RabbitMQConnection {public static Connection getConnection() throws Exception{//1.创建connectionFactoryConnectionFactory connectionFactory new ConnectionFactory();//2.配置HostconnectionFactory.setHost("127.0.0.1");//3.设置Po…...

如何在 Debian 上安装运行极狐GitLab Runner?【二】
极狐GitLab 是 GitLab 在中国的发行版,专门面向中国程序员和企业提供企业级一体化 DevOps 平台,用来帮助用户实现需求管理、源代码托管、CI/CD、安全合规,而且所有的操作都是在一个平台上进行,省事省心省钱。可以一键安装极狐GitL…...

简单的docker学习 第13章 CI/CD与Jenkins(下)
第13章 CI/CD 与 Jenkins 13.13 自由风格的 CI 操作(最终架构) 前面的架构存在的问题是,若有多个目标服务器都需要使用该镜像,那么每个目标服务器都需要在本地构建镜像,形成系统资源浪费。若能够在 Jenkins 中将镜像相撞构建好并推送到 Har…...
基于STM32设计的智能鱼缸_带鱼儿数量视觉识别(华为云IOT)(202)
文章目录 一、前言1.1 项目介绍【1】项目功能介绍【2】设计实现的功能【3】项目硬件模块组成1.2 设计思路【1】整体设计思路【2】ESP8266工作模式配置【3】自动换水原理1.3 项目开发背景【1】选题的意义【2】可行性分析【3】参考文献1.4 开发工具的选择【1】设备端开发【2】上位…...

立体连接模式下的传播与沟通:AI智能名片小程序的创新应用与深度剖析
摘要:在数字化浪潮的推动下,信息传播与沟通方式正经历着前所未有的变革。立体连接模式,作为这一变革的重要产物,通过整合物理空间、虚拟网络空间与社群心理空间的三维联动,实现了信息的深度传播与高效互动。AI智能名片…...

基于Python的Scrapy爬虫的个性化书籍推荐系统【Django框架、超详细系统设计原型】
文章目录 有需要本项目的代码或文档以及全部资源,或者部署调试可以私信博主项目介绍系统分析系统设计展示总结 有需要本项目的代码或文档以及全部资源,或者部署调试可以私信博主 项目介绍 近年来,随着互联网的蓬勃发展,企事业单…...
二叉树bst
二叉搜索树的中序遍历结果有序 ,二叉搜索树性质,左小右大,二叉搜索树中序遍历的结果应该是从小到大的。 题目描述二叉树是从上到下,从左到右描述,并非前中后序中的一种。 99. 恢复二叉搜索树 class Solution:first …...

elasticsearch的使用(二)
DSL查询 Elasticsearch的查询可以分为两大类: 叶子查询(Leaf query clauses):一般是在特定的字段里查询特定值,属于简单查询,很少单独使用。 复合查询(Compound query clauses)&am…...

YOLOv8由pt文件中读取模型信息
Pytorch的pt模型文件中保存了许多模型信息,如模型结构、模型参数、任务类型、批次、数据集等 在先前的YOLOv8实验中,博主发现YOLOv8在预测时并不需要指定任务类型,因为这些信息便保存在pt模型中,那么,今天我们便来看看…...
js遍历效率
1w条数据,遍历效率 1、for 15s let t(new Date()).getTime()let a[]for(var i 0; i < 100000; i){a.push({id:i,val:i})}let ts[]for(var i 0; i < a.length; i){if(a[i].val!2 && a[i].val!4 && a[i].val!8){ts.push(a[i])}}let c(new D…...

QModbus例程分析
由于有一个Modebus上位机的需要,分析一下QModbus Slave的源代码,方便后面的开发。 什么是Modbus Modbus是一种常用的串行通信协议,被广泛应用于工业自动化领域。它最初由Modicon(目前属于施耐德电气公司)于1979年开发…...

Vue万字学习笔记(入门1)
目录 简介 Vue是什么 渐进式框架 单文件组件 API 风格 选项式 API (Options API) 组合式 API (Composition API) 创建一个 Vue 应用 挂载应用 DOM 中的根组件模板 应用配置 多个应用实例 模板语法 文本插值 原始 HTML Attribute 绑定 简写…...

Cesium手动建模模型用Cesiumlab转3D Tiles模型位置不对,调整模型位置至指定经纬度
Cesium加载3Dtiles模型的平移和旋转_3dtiles先旋转再平移示例-CSDN博客 Cesium 平移cesiumlab生产的3Dtiles切片模型到目标经纬度-CSDN博客 【ArcGISCityEngine】自行制作Lod1城市大尺度白膜数据_cityengine 生成指定坐标集指定区域的白模-CSDN博客 以上次ArcGISCityEngine制…...

学习C语言第23天(程序环境和预处理)
1. 程序的翻译环境和执行环境 在ANSIC的任何一种实现中,存在两个不同的环境 第1种是翻译环境,在这个环境中源代码被转换为可执行的机器指令。 第2种是执行环境,它用于实际执行代码。 2. 详解编译链接 2.1 翻译环境 每个源文件单独经过编…...

Ubuntu22.04安装
使用Vmware安装好后 首先执行下面命令,不然每次打开终端会出现To run a command as administrator (user root)… touch ~/.sudo_as_admin_successful换源 参考 sudo cp /etc/apt/sources.list /etc/apt/sources.list.baksudo gedit /etc/apt/sources.list清空…...

从入门到自动化:一篇文章掌握Python的80%
Python作为一种高级编程语言,以其简洁明了的语法和强大的功能性,在全球编程社区内享有极高的声誉。本文将带领你从Python的基础语法入手,介绍其常用库的应用,以及如何将Python用于数据分析、网络爬虫和简单的自动化任务࿰…...
开源的主流机器学习框架
主流的开源机器学习框架包括: 1. TensorFlow:由Google开发和维护的深度学习框架,广泛用于生产环境和研究。支持多种平台,并具有丰富的工具和库支持。 2. PyTorch:由Facebook开发的深度学习框架,以其动态计…...
RabbitMQ:发送者的可靠性之配置发送者重试机制
文章目录 为什么需要重试机制?如何配置重试机制?测试重试机制使用重试机制的注意事项 在使用消息队列(MQ)系统时,网络故障是不可避免的问题,尤其是在与RabbitMQ等服务交互时。如果生产者在发送消息时遇到网…...

基于深度学习的大规模MIMO信道状态信息反馈
MIMO系统 MIMO系统利用多个天线在发送端和接收端之间建立多条独立的信道,从而使得同一时间可以传输多个数据流,从而使得同一之间可以传输多个数据流,提高数据传输速率。 优势 增加传输速率和容量,提高信号覆盖范围和抗干扰能力…...

在Docker中部署Rasa NLU服务
最近因为项目需要将rasa nlu配置到docker容器中供系统调用,本篇主要整理该服务的docker配置过程。 本篇的重点在于docker的使用,不在Rasa NLU。 系统环境:Ubuntu 18.04.6 1. Rasa介绍 Rasa是一个开源的机器学习框架,专为构建基于文…...

业务系统对接大模型的基础方案:架构设计与关键步骤
业务系统对接大模型:架构设计与关键步骤 在当今数字化转型的浪潮中,大语言模型(LLM)已成为企业提升业务效率和创新能力的关键技术之一。将大模型集成到业务系统中,不仅可以优化用户体验,还能为业务决策提供…...

练习(含atoi的模拟实现,自定义类型等练习)
一、结构体大小的计算及位段 (结构体大小计算及位段 详解请看:自定义类型:结构体进阶-CSDN博客) 1.在32位系统环境,编译选项为4字节对齐,那么sizeof(A)和sizeof(B)是多少? #pragma pack(4)st…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...

关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案
问题描述:iview使用table 中type: "index",分页之后 ,索引还是从1开始,试过绑定后台返回数据的id, 这种方法可行,就是后台返回数据的每个页面id都不完全是按照从1开始的升序,因此百度了下,找到了…...
Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器
第一章 引言:语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域,文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量,支撑着搜索引擎、推荐系统、…...

论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)
笔记整理:刘治强,浙江大学硕士生,研究方向为知识图谱表示学习,大语言模型 论文链接:http://arxiv.org/abs/2407.16127 发表会议:ISWC 2024 1. 动机 传统的知识图谱补全(KGC)模型通过…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
laravel8+vue3.0+element-plus搭建方法
创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...
MySQL账号权限管理指南:安全创建账户与精细授权技巧
在MySQL数据库管理中,合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号? 最小权限原则…...
rnn判断string中第一次出现a的下标
# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...