【MQ02】基础简单消息队列应用
基础简单消息队列应用
在上一课中,我们已经学习到了什么是消息队列,有哪些消息队列,以及我们会用到哪个消息队列。今天,就直接进入主题,学习第一种,最简单,但也是最常用,最好用的消息队列模式。
最简单的队列功能
最简单的队列功能,无非就是将我们在数据结构与算法中学过的那个队列结构,变成一个外部功能组件。让各种语言和各种应用程序都可以通过这个队列来进行数据操作。这样的一个队列系统就称之为“消息队列中间件”。
一般,我们会将生产消息的程序,或者说,将数据放入到队列的一方称为 P (生产者,Producer);然后将队列称为Q(Queue);最后,将守候在队列前,等待从队列中获取数据的应用、程序或者代码段称为 C(消费者/客户端,Consumer)。

这是 RabbitMQ 官网手册上的图,后面的相关图示我们也将直接使用它们的。在这个图中,有字母的部分就不多解释了。中间红色的一格一格的部分代表的就是 Q 。
通常来说,P 只管将数据放到 Q 中,Q 负责中间存储数据,然后 C 取出数据。数据的进出方式遵循经典数据结构队列中的规则,也就是 FIFO 先进先出。
是不是就和我们之前说过的一样,最简单的理解,它就是将队列这个数据结构抽成了一个第三方组件。这样就可以跨平台、跨应用、跨语言地进行数据存取了。
RebbitMQ 实现
好了,先来看 RabbitMQ 的实现。你需要先安装好 RabbitMQ ,我这里是使用的 Docker 安装的。
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management 然后,也要安装好 PHP 的 Composer 组件,用于操作 RabbitMQ 消息队列,PHP 需要开启 sockets 扩展。
composer require php-amqplib/php-amqplib 5672 是 RabbitMQ 的服务端口,15672 则是它自带的一个管理工具的访问端口。具体的内容大家可以到官方文档中进行更加深入的学习。当然,也可以使用虚拟机方式来搭建测试环境,这个大家看自己的喜好吧。
接下来,我们先实现 P 端的代码,也就是生产者向消息队列中添加数据。
// 2.rq.p.php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel(); // 获取频道// 定义队列
$channel->queue_declare('hello', false, false, false, false);// 创建消息
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello'); // 将消息放入队列中echo "生产者向消息队列中发送信息:Hello World!";$channel->close();
$connection->close(); 注释很清晰了吧,如果你之前没有用 PHP 操作过 RabbitMQ 的话,那么直接复制这段代码就可以了。其中比较特殊的是 channel ,它是共享单个 TCP 连接的轻量级信道。这个概念可能是 RabbitMQ 相较于其它消息队列系统比较特别的。然后就是消息内容是通过一个 AMQPMessage 对象承载的,这个 AMQP 其实就是 RabbitMQ 的核心协议。协议是通信双方能够相互看明白对方的基础,能够建立通信的基础,就像我们之前在 Redis 中学习过的 RESP 协议一样。这里大家只需要知道 RabbitMQ 是使用这个协议就好了,而且它也支持其它的一些协议。发送完消息之后,记得关闭连接哦。
好了,接下来是我们的消费者/客户端实现。
// 2.rq.c.php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel(); // 获取频道// 定义队列
$channel->queue_declare('hello', false, false, false, false);echo "等待消息,或者使用 Ctrl+C 退出程序。", PHP_EOL;// 定义接收数据的回调函数
$callback = function ($msg) {echo '接收到数据: ', $msg->body, PHP_EOL;
};
// 消费队列,获取到数据将调用 callback 回调函数
$channel->basic_consume('hello', '', false, true, false, false, $callback);// 频道是开启状态时,挂起程序,不停地执行
while ($channel->is_open()) {// 等待并监听频道中的队列信息// 发现上方 basic_consume 定义的队列有消息后// 就调用它对应的 callback$channel->wait();
} 看着好像多很多东西呀?其实上面一半和生产者是一样的。从中间开始,我们使用的是 Channel 对象的 basic_consume() 方法,这个方法最后有一个回调函数参数。然后在下面通过 wait() 方法持续监听队列中是否有数据。如果有数据了,就调用指定的回调函数。并将消息内容交给回调函数的参数。
注意哦,一般来说,消息队列的消费者,或者说是客户端,或者说是 C 端。大部分情况下可能都会是这样通过一个死循环挂起的。目的就是当我们运行起程序之后,可以不停地,不间断地一直处理队列中的消息数据。之前在学习 Swoole 时,另外如果你学习过 Go 语言的话,也会发现它们的 Http 服务中也是有类似的死循环代码来实现服务端挂起的。这个大家可以到我的 Swoole 系列中看看哦。
测试一下吧,运行一下生产者代码。
➜ source git:(master) ✗ php 2.rq.p.php
生产者向消息队列中发送信息:Hello World!% 执行结束了,只是输出了一句话,没啥别的效果。那么我们再来运行一下消费者代码。
> php 2.rq.c.php
等待消息,或者使用 Ctrl+C 退出程序。
接收到数据: Hello World! 可以看到,消费者先是输出了接收到的数据,这个数据其实是上一步我们运行生产者插入到队列中的数据。现在,这条数据打印出来了,其实就是相当于已经被我们消费了。当然,在实际业务中,你可能会对这些数据进行更复杂的业务操作。但在演示时,我这里只是打印了一下。然后,消费者会继续挂在这里等待下一条消息的到来。这时,你可以再次运行生产者代码,然后就会看到消费者这边直接就已经消费了。
Redis 实现
对于 Redis 的实现,其实非常简单,我们之前也已经学过的,那就是使用 List 这个数据结构。先来看生产者,直接 push 数据就好了。
// 2.rs.p.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);echo "生产者向消息队列中发送信息:Hello World!";$redis->lpush('hello', 'Hello World!'); 消费者呢?当然就是我们之前已经学过的 pop 啦。
// 2.rs.p.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);echo "等待消息,或者使用 Ctrl+C 退出程序。", PHP_EOL;while(1){$data = $redis->rpop('hello');if ($data){echo '接收到数据: ', $data, PHP_EOL;}
} 在这里,我们使用的是 lpush() + rpop() 的方式,当然你也可以使用 rpush() + lpop() 的方式,大家思考一下,如果是 lpush() + lpop() ,是实现的什么数据结构呢?
同样地,在 Redis 的消费者中,我们也需要通过一个死循环挂起消费者,然后不停地获取数据进行处理。剩下的测试过程就和上面的 RabbitMQ 一样了。
我的实践
之前我就说过,我的消息队列实践不多。唯一的业务场景下实现的高并发消息队列应用其实就是上面的 Redis 这两段代码。真的,核心就是这点东西。但为了抗高并发,我是使用的 Swoole ,生产者是在 Hyperf 框架中通过控制器接收到数据后,直接就放到 Redis 里。然后消费者就是一个命令行,接着开 100 个协程,将获取到的消息数据丢给协程处理。
业务应用是游戏的日志上报,最高并发 20000+ ,入库日志量 3000万+ 。使用的是阿里云最便宜的那个 Redis 服务,4G 大小单实例的那款。
是不是见到了消息队列的恐怖能力。这个量级,对于任何消息队列应用来说问题都不大,RabbitMQ 被认为是比较慢的,但是,它的处理能力是每秒几万次请求。而 Redis ,就是以 Redis 的读写性能为基础的,大概每秒11万的读和8万的写。这个在之前的 Redis 学习中都已经说过了。
总结
今天通过代码,我们其实就已经学习到了整个消息队列中最核心的内容。没错,消息队列就是这么地简单,但又这么地实用。我的业务例子其实是异步解耦的一种实现。而对于另一种常见的场景,秒杀,大家想一想,是不是也可以直接通过这样一个简单的队列就能够实现了。当然,可能还比较简陋,也需要考虑更多的东西,但是,一秒内处理几万条请求和一秒内让几万条请求入队,这个差别可大了去了。
其实,从队列的思想就可以看出,我们用数据库也可以实现队列,插入数据是入队,然后倒序查询出来一条就可以视为出队。但是呢,数据库的性能往往和专业的消息队列以及 NoSQL 工具都是有很大的差距的。因此,其实还是那句话,把握本质和思想,工具用啥都好说。
测试代码:
https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/2.rq.c.php
https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/2.rq.c.php
https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/2.rq.c.php
https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/2.rq.c.php
相关文章:
【MQ02】基础简单消息队列应用
基础简单消息队列应用 在上一课中,我们已经学习到了什么是消息队列,有哪些消息队列,以及我们会用到哪个消息队列。今天,就直接进入主题,学习第一种,最简单,但也是最常用,最好用的消息…...
CTF CRYPTO 密码学-7
题目名称:敲击 题目描述: 让我们回到最开始的地方 0110011001101100011000010110011101111011011000110110010100110011011001010011010100110000001100100110001100101101001101000011100001100011001110010010110100110100011001000011010100110000…...
随机森林和决策树区别
随机森林(Random Forest)和决策树(Decision Tree)是两种不同的机器学习算法,其中随机森林是基于决策树构建的一种集成学习方法。以下是它们之间的主要区别: 决策树: 单一模型: 决策树是一种单一模型&#…...
新建VM虚拟机-安装centOS7-连接finalshell调试
原文 这里有问题 首先进入/etc/sysconfig/network-scripts/目录 cd /etc/sysconfig/network-scripts/ 然后编辑文件 ifcfg-ens33 vi ifcfg-ens33...
936. 戳印序列
Problem: 936. 戳印序列 文章目录 思路解题方法复杂度Code 思路 这道题目要求我们通过使用印章来印刷目标字符串,使得目标字符串最终变成全为’?‘的字符串。我们可以使用贪心的思想来解决这个问题。 首先,我们需要找到所有可以匹配印章的位置ÿ…...
20240129收获
今天终于发现《八部金刚功》第五部我一直做的是错的,嗨。这里这个写法非常聪明,创立的数组,以及用obj[key] item[key]这样的写法,这个写法充分展示了js常规写法中只有等号右边会去参与运算,等号左边就是普通的键的写法…...
【虚拟机数据恢复】异常断电导致虚拟机无法启动的数据恢复案例
虚拟机数据恢复环境: 某品牌R710服务器MD3200存储,上层是ESXI虚拟机和虚拟机文件,虚拟机中存放有SQL Server数据库。 虚拟机故障: 机房非正常断电导致虚拟机无法启动。服务器管理员检查后发现虚拟机配置文件丢失,所幸…...
vue3 + antd 封装动态表单组件(三)
传送带: vue3 antd 封装动态表单组件(一) vue3 antd 封装动态表单组件(二) 前置条件: vue版本 v3.3.11 ant-design-vue版本 v4.1.1 我们发现ant-design-vue Input组件和FormItem组件某些属性支持slot插…...
【算法专题】贪心算法
贪心算法 贪心算法介绍1. 柠檬水找零2. 将数组和减半的最少操作次数3. 最大数4. 摆动序列(贪心思路)5. 最长递增子序列(贪心算法)6. 递增的三元子序列7. 最长连续递增序列8. 买卖股票的最佳时机9. 买卖股票的最佳时机Ⅱ(贪心算法)10. K 次取反后最大化的数组和11. 按身高排序12…...
x-cmd pkg | sqlite3 - 轻量级的嵌入式关系型数据库
目录 简介首次用户 技术特点竞品和相关产品sqlite 与 x-cmd进一步阅读 简介 sqlite3 是一个轻量级的文件数据库,体积非常小,提供简单优雅而功能强大的 sql 化的数据查询。 通常情况下,sqlite 指的是 SQLite 2.x 版本,而 sqlite3 …...
LeetCode —— 43. 字符串相乘
😶🌫️😶🌫️😶🌫️😶🌫️Take your time ! 😶🌫️😶🌫️😶🌫️😶🌫️…...
PalWorld/幻兽帕鲁Ubuntu 22.04 LTS 一键部署脚本
上去就是干! 创建install.sh文件 #!/bin/bashsteam_usersteam log_path/tmp/pal_server.logif getent passwd "$steam_user" >/dev/null 2>&1; thenecho "User $steam_user exists." elseecho "User $steam_user does not exi…...
【Vue】Vue3.0样式隔离
在这里记录一下Vue3.0里面的样式隔离特性,在项目开发过程当中,有时候将样式单独提到了一个文件当中再引入到单组件文件当中,会导致没有样式隔离。 这里阅读Vue官方文档找到了解决办法。 一、scoped 我们了解到的最常见就是scopedÿ…...
Git初识
📙 作者简介 :RO-BERRY 📗 学习方向:致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 📒 日后方向 : 偏向于CPP开发以及大数据方向,欢迎各位关注,谢谢各位的支持 在学习Git之前我们先引入一…...
OpenHarmony隐藏应用(应用不在桌面显示,隐藏应用图标)
注意:此种方式是在OpenHarmony系统中生效 目录 一.找到UnsgnedReleasedProfileTemplate.json文件 二.修改 UnsgnedReleasedProfileTemplate.json文件 三.重新签名 一.找到UnsgnedReleasedProfileTemplate.json文件 什么是U...
2024年新提出的算法:(凤头豪猪优化器)冠豪猪优化算法Crested Porcupine Optimizer(附Matlab代码)
本次介绍一种新的自然启发式元启发式算法——凤头豪猪优化器(Crested Porcupine Optimizer,CPO)。该成果于2024年1月发表在中科院1区SCI top期刊Knowledge-Based Systems(IF 8.8)上。 1、简介 受到凤头豪猪(CP)各种…...
vue3 el-pagination 将组件中英文‘goto’ 修改 为 中文到‘第几’
效果如图: 要求:将英文中Go to 改为到第几 操作如下: <template><div class"paging"><el-config-provider :locale"zhCn"> // 注意:这是重要部分<el-pagination //分页组件根据官…...
【蓝桥杯日记】复盘篇二:分支结构
前言 本篇笔记主要进行复盘的内容是分支结构,通过学习分支结构从而更好巩固之前所学的内容。 目录 前言 目录 🍊1.数的性质 分析: 知识点: 🍅2.闰年判断 说明/提示 分析: 知识点: &am…...
Vulnhub靶机:hackme1
一、介绍 运行环境:Virtualbox(攻击机)和VMware(靶机) 攻击机:kali(192.168.56.106) 靶机:hackme1(192.168.56.107) 目标:获取靶机root权限和flag 靶机下载地址:htt…...
【C/C++ 06】基数排序
基数排序是桶排序的一种,算法思路为: 利用队列进行数据收发创建一个队列数组,数组大小为10,每个元素都是一个队列,存储取模为1~9的数从低位到高位进行数据收发,完成排序适用于数据位不高的情况(…...
linux之kylin系统nginx的安装
一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源(HTML/CSS/图片等),响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址,提高安全性 3.负载均衡服务器 支持多种策略分发流量…...
<6>-MySQL表的增删查改
目录 一,create(创建表) 二,retrieve(查询表) 1,select列 2,where条件 三,update(更新表) 四,delete(删除表…...
MFC内存泄露
1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...
【Linux】C语言执行shell指令
在C语言中执行Shell指令 在C语言中,有几种方法可以执行Shell指令: 1. 使用system()函数 这是最简单的方法,包含在stdlib.h头文件中: #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...
8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂
蛋白质结合剂(如抗体、抑制肽)在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上,高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术,但这类方法普遍面临资源消耗巨大、研发周期冗长…...
理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
Java入门学习详细版(一)
大家好,Java 学习是一个系统学习的过程,核心原则就是“理论 实践 坚持”,并且需循序渐进,不可过于着急,本篇文章推出的这份详细入门学习资料将带大家从零基础开始,逐步掌握 Java 的核心概念和编程技能。 …...
Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...
AI,如何重构理解、匹配与决策?
AI 时代,我们如何理解消费? 作者|王彬 封面|Unplash 人们通过信息理解世界。 曾几何时,PC 与移动互联网重塑了人们的购物路径:信息变得唾手可得,商品决策变得高度依赖内容。 但 AI 时代的来…...
技术栈RabbitMq的介绍和使用
目录 1. 什么是消息队列?2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...
