tp8 使用rabbitMQ(3)发布/订阅
发布/订阅
当我们想把一个消息,发送给 多个消费者的时候,我们把这种模式叫做发布/订阅模式,比如我们做两个消费者,其中一个消费者把消息写入磁盘中,别一个消费者把消息结果输出到屏幕上,就要用到发布订阅模式
- 发布者(producer)是发布消息的应用程序。
- 队列(queue)用于消息存储的缓冲。
- 消费者(consumer)是接收消息的应用程序。
下面的图才是 rabbitmq 的完整模式, 中间是有交换机的

交换机
RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。
发布者(producer)只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。
我们之前的 简单队列和工作队列中,没有提来交换机的概念。
默认交换机
当我们使用RabbitMQ时,如果不指定交换机的类型,那么Rabbit会使用默认的一个交换机,这个默认的交换机类型是一个直连交换机(direct),后续新建的队列(queue)都会自动绑定到这个默认交换机上,绑定的路由键就是队列的名称,注意这个默认交换机的名称是一个空字符串 " "
交换机的种类有多种
直连交换机(direct), 主题交换机(topic), (头交换机)headers和 扇型交换机(fanout)
头交换机的性能不好, 基本不用
用的最多的还是 扇形交换机 相当于是广播
前面两节中,我们只使用了下面的代码,其实是使用的默认交换机,没有定义,直接使用了
$channel->basic_publish($msg, '', 'hello');
发布订阅模式中,我们使用 扇形交换机 fanout 代码如下
$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');
上面两段代码比较,第一段,因为使用了默认的交换机,所以没有交换机的定义语句, 但是在发布的时候,中间那个参数是 “”,这样就指定了默认交换机的名称, 第二段,我们指明了要使用的交换机 fanout 所以在发布的时候,使用的是自定义的交换机名称
因为有了交换机,生产者代码中只需要把 message 发送给交换机就可以了, 所以生产者中不需要创建队列,创建队列放到 消费者中就可以了,(如果我们一定要把创建队列的时机放在生产者中,也是可以的, 个人根据需要灵活应用)
交换机和队列的绑定(这里应该是在消费者代码中出现的)
我们创建了交换机,并且有了N个队列,它们之间要建立绑定关系,才可以分发到相应有绑定的队列中
$channel->queue_bind($queue_name, 'hello'); //这样就把队列名称和交换机名称做了绑定
下面的 完整的代码示例
生产者
<?php
declare (strict_types = 1);namespace app\command;use ba\Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;class PubSubMQProduce extends Command
{protected function configure(){// 指令配置$this->setName('pubsubmqproduce')->setDescription('发布订阅模式的生产者');}protected function execute(Input $input, Output $output){//获取连接$connection = $this->getConnectRabbitMQ();//创建通道$channel = $connection->channel();//创建交换机/*** params exchange 自定义交换机名称* params type 交换机的类型, 一般都会使用 扇形(fanout)* params passive 是否消极声名* params durable 是否持久化* params auto_delete 是否自动删除* params internal 设置是否内置的, true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器这个方式* params nowait 相当于做一个异步版的声明,不等待返回,就让程序继续执行*/$channel->exchange_declare("exchangeName","fanout",false,false,false,false,false);//现在生产者只需要把消息发给交换机就可以了,所以不用在生产者中创建队列了(当然,想创建也是可以的)for ($i = 0; $i < 20; $i++) {$msgArr = ["name"=>"haha".$i,"age"=>'10'.$i,"sex"=>"female".$i];$msg = new AMQPMessage(json_encode($msgArr),["delivery_mode"=>AMQPMessage::DELIVERY_MODE_PERSISTENT]);sleep(1);$channel->basic_publish($msg,"exchangeName");}$channel->close();$connection->close();}protected function getConnectRabbitMQ(){try{$connection = new AMQPStreamConnection("192.168.3.228",'5672',"admin","123456");return $connection;}catch(Exception $e){throw new Exception("队列连接失败");}}
}
消费者
<?php
declare (strict_types = 1);namespace app\command;use ba\Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;class PubSubMQConsumer extends Command
{protected function configure(){// 指令配置$this->setName('pubsubmqconsumer')->setDescription('发布订阅模式的消费者');}protected function execute(Input $input, Output $output){$connection = $this->connectRabbitMQ();$channel = $connection->channel();//创建两个队列$channel->queue_declare("queueName1",false,false,false,false,false);$channel->queue_declare("queueName2",false,false,false,false,false);//绑定交换机和队列,交换机的名称是在生产者中定义的$channel->queue_bind("queueName1","exchangeName");$channel->queue_bind("queueName2","exchangeName");//设置消息处理函数$callback1 = function($msg){$msgArr = json_decode($msg->body,true);echo "这是(显示)处理数据的队列NO1 ".$msgArr["name"]."-11-".$msgArr["age"]."-11-".$msgArr["sex"].PHP_EOL;$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); //这里让就是消息的应答了};$callback2 = function($msg){$msgArr = json_decode($msg->body,true);echo "这是(保存)处理数据的队列NO2 ".$msgArr["name"]."-22-".$msgArr["age"]."-22-".$msgArr["sex"].PHP_EOL;$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); //这里让就是消息的应答了};$channel->basic_consume("queueName1","",false,false,false,false,$callback1);$channel->basic_consume("queueName2","",false,false,false,false,$callback2);while(count($channel->callbacks)){$channel->wait();}}protected function connectRabbitMQ(){try{$connection = new AMQPStreamConnection("192.168.3.228",'5672',"admin","123456");return $connection;}catch(Exception $e){throw new Exception("队列连接失败");}}
}相关文章:
tp8 使用rabbitMQ(3)发布/订阅
发布/订阅 当我们想把一个消息,发送给 多个消费者的时候,我们把这种模式叫做发布/订阅模式,比如我们做两个消费者,其中一个消费者把消息写入磁盘中,别一个消费者把消息结果输出到屏幕上,就要用到发布订阅模…...
【nlp】3.4 Transformer论文复现:2. 编码器部分(规范化层、子层连接结构、编码器层)
3.4 Transformer论文复现:2. 编码器部分(规范化层、子层连接结构、编码器层) 2.6 规范化层2.6.1 规范化层的作用2.6.2 规范化层的代码实现2.6.3 规范化层总结2.7 子层连接结构2.7.1 子层连接结构2.7.2 子层连接结构的代码实现2.7.3 子层连接结构总结2.8 编码器层2.8.1 编码器…...
面试:ShardingSphere问题
文章目录 什么是ShardingSphere,它的主要功能是什么?ShardingSphere的核心模块有哪些?他们是如何工作的?ShardingSphere 的读写分离是如何实现的?如何配置ShardingSphere的数据分片策略?ShardingSphere支持…...
NX二次开发UF_CURVE_ask_offset_direction_2 函数介绍
文章作者:里海 来源网站:https://blog.csdn.net/WangPaiFeiXingYuan UF_CURVE_ask_offset_direction_2 Defined in: uf_curve.h int UF_CURVE_ask_offset_direction_2(UF_STRING_p_t input_curves, double offset_direction_vector [ 3 ] , double dra…...
【研究中】sql server权限用户设置23.11.26
--更新时间2023.11.26 21:30 负责人:jerrysuse DBAliCMSIF EXISTS (select * from sysobjects where namehkcms_user)--判断是否存在此表DROP TABLE hkcms_user CREATE TABLE hkcms_user (id int primary key identity(1, 1),username char(32) NOT N…...
java多线程一
1、什么是线程 线程(Thread)是一条程序内部的一条执行流程。 程序中如果只有一条执行流程,那这个程序就是单线程的程序。 2、什么是多线程 多线程(multithreading),是指从软件或者硬件上实现多个线程并发执…...
电脑技巧:电脑常见蓝屏、上不了网等故障及解决办法
目录 一、电脑蓝屏 常见原因1: 病毒木马 常见原因2: 安装了不兼容的软件 二、电脑不能上网 常见原因1: 新装系统无驱动 常见原因2: DNS服务器异常 常见原因3: 硬件问题 三、电脑没声音 常见原因1: 未安装驱动 常见原因2: 硬件故障 四、电脑屏幕不显示 常见原因1: 显…...
大语言模型损失函数详解
我们可以把语言模型分为两类: 自动回归式语言模型:自动回归式语言模型在本质上是单向的,也就是说,它只沿着一个方向阅读句子。正向(从左到右)预测;反向(从右到左)预测。…...
Spring Boot 3 集成 Knife4j
基础环境 SpringBoot : 3.0.6 Java: jdk-17.0.5 Maven: 3.6.1依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xs…...
BetaFlight模块设计之三十六:SoftSerial
BetaFlight模块设计之三十六:SoftSerial 1. 源由2. API接口2.1 openSoftSerial2.2 onSerialRxPinChange2.3 onSerialTimerOverflow2.4 processTxState2.5 processRxState 3. 辅助函数3.1 applyChangedBits3.2 extractAndStoreRxByte3.3 prepareForNextRxByte 4. 总结…...
PC访问华为昇腾开发板的摸索过程
作者:朱金灿 来源:clever101的专栏 为什么大多数人学不会人工智能编程?>>> 最近要折腾华为昇腾开发板(官方名称叫:Atlas 200I DK)。先是按照官方教程折腾:Atlas200DK环境部署。我发现…...
C++学习之路(六)C++ 实现简单的工具箱系统命令行应用 - 示例代码拆分讲解
简单的工具箱系统示例介绍: 这个示例展示了一个简单的工具箱框架,它涉及了几个关键概念和知识点: 面向对象编程 (OOP):使用了类和继承的概念。Tool 是一个纯虚类,CalculatorTool 和 FileReaderTool 是其派生类。 多态࿱…...
redis运维(十四) hash缓存案例
一 缓存案例 ① 需求 ② 个人理解 策略:不更新缓存,而是删除缓存大部分观点认为:1、做缓存不应该是去更新缓存,而是应该删除缓存2、然后由下个请求去缓存,发现不存在后再读取数据库,写入redis缓存 高并发场景下,到底先更新缓存还是先更…...
Rust UI开发(三):iced如何打开图片(对话框)并在窗口显示图片?
注:此文适合于对rust有一些了解的朋友 iced是一个跨平台的GUI库,用于为rust语言程序构建UI界面。 这是一个系列博文,本文是第三篇,前两篇的链接: 1、Rust UI开发(一):使用iced构建…...
网络爬虫(Python:Requests、Beautiful Soup笔记)
网络爬虫(Python:Requests、Beautiful Soup笔记) 网络协议简要介绍一。OSI参考模型二、TCP/IP参考模型对应关系TCP/IP各层实现的协议应用层传输层网络层 HTTP协议HTTP请求HTTP响应HTTP状态码 Requests(Python)Requests…...
【Kotlin】内联函数
文章目录 内联函数noinline: 避免参数被内联非局部返回使用标签实现Lambda非局部返回为什么要设计noinline crossinline具体化参数类型 Kotlin中的内联函数之所以被设计出来,主要是为了优化Kotlin支持Lambda表达式之后所带来的开销。然而,在Java中我们似…...
Unity技美35——再URP管线环境下,配置post后期效果插件(post processing)
前两年在我的unity文章第10篇写过,后效滤镜的使用,那时候大部分项目用的还是unity的基础管线,stander管线。 但是现在随着unity的发展,大部分项目都用了URO管线,甚至很多PC端用的都是高效果的HDRP管线,这就…...
Redis:持久化RDB和AOF
目录 概述RDB持久化流程指定备份文件的名称指定备份文件存放的目录触发RDB备份redis.conf 其他一些配置rdb的备份和恢复优缺点停止RDB AOF持久化流程AOF启动/修复/恢复AOF同步频率设置rewrite压缩原理触发机制重写流程no-appendfsync-on-rewrite 优缺点 如何选择 概述 Redis是…...
基于python协同过滤推荐算法的音乐推荐与管理系统
欢迎大家点赞、收藏、关注、评论啦 ,由于篇幅有限,只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 基于Python的协同过滤推荐算法的音乐推荐与管理系统是一个集成了音乐推荐和管理的系统,它使用协同过滤算…...
【极客技术】真假GPT-4?微调 Llama 2 以替代 GPT-3.5/4 已然可行!
近日小编在使用最新版GPT-4-Turbo模型(主要特点是支持128k输入和知识库截止日期是2023年4月)时,发现不同商家提供的模型回复出现不一致的情况,尤其是模型均承认自己知识库达到2023年4月,但当我们细问时,Fak…...
网络六边形受到攻击
大家读完觉得有帮助记得关注和点赞!!! 抽象 现代智能交通系统 (ITS) 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 (…...
手游刚开服就被攻击怎么办?如何防御DDoS?
开服初期是手游最脆弱的阶段,极易成为DDoS攻击的目标。一旦遭遇攻击,可能导致服务器瘫痪、玩家流失,甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案,帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...
7.4.分块查找
一.分块查找的算法思想: 1.实例: 以上述图片的顺序表为例, 该顺序表的数据元素从整体来看是乱序的,但如果把这些数据元素分成一块一块的小区间, 第一个区间[0,1]索引上的数据元素都是小于等于10的, 第二…...
docker详细操作--未完待续
docker介绍 docker官网: Docker:加速容器应用程序开发 harbor官网:Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台,用于将应用程序及其依赖项(如库、运行时环…...
微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】
微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来,Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...
工业安全零事故的智能守护者:一体化AI智能安防平台
前言: 通过AI视觉技术,为船厂提供全面的安全监控解决方案,涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面,能够实现对应负责人反馈机制,并最终实现数据的统计报表。提升船厂…...
线程同步:确保多线程程序的安全与高效!
全文目录: 开篇语前序前言第一部分:线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分:synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分ÿ…...
Leetcode 3577. Count the Number of Computer Unlocking Permutations
Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接:3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯,要想要能够将所有的电脑解锁&#x…...
江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命
在华东塑料包装行业面临限塑令深度调整的背景下,江苏艾立泰以一场跨国资源接力的创新实践,重新定义了绿色供应链的边界。 跨国回收网络:废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点,将海外废弃包装箱通过标准…...
数据链路层的主要功能是什么
数据链路层(OSI模型第2层)的核心功能是在相邻网络节点(如交换机、主机)间提供可靠的数据帧传输服务,主要职责包括: 🔑 核心功能详解: 帧封装与解封装 封装: 将网络层下发…...
