Redis消息队列与thinkphp/queue操作
业务场景
场景一
用户完成注册后需要发送欢迎注册的问候邮件、同时后台要发送实时消息给用户对应的业务员有新的客户注册、最后将用户的注册数据通过接口推送到一个营销用的第三方平台。
遇到两个问题:
- 由于代码是串行方式,流程大致为:开启数据库事务回滚->数据入库准备->发邮件->发实时消息->推送第三方平台->提交写入数据库。但是后续的3个步骤任意一个流程出了问题都会影响用户的注册结果。
- 发送邮件使用的不是成熟的第三方产品,而是利用
phpmaile自写代码实现的,然而这个过程耗时相对较长且偶尔有失败的情况;另外通过接口推送注册用户数据到的第三方平台是一个国外的产品接口通讯时间很长且一样有失败的情况。
以上两个问题就会导致用户的注册交互流程时间很长产品体验感非常差;且发送邮件、发送消息、推送数据任意一个步骤由于特殊情况导致执行失败都不能终止用户注册这样就只能通过日志捕获相应的失败情况。
场景二
用户在Shopify平台(一个跨境电商平台)付款下单后,商家会将订单同步到我的系统中,在我的系统中完成询价、报价、付款后我需要再将订单数据推送到第三方配货发货的平台。平台发货完成后通过设置好的回调地址通知我的系统发货的物流信息数据,我需要将物流信息数据存入到我的数据库后再将物流信息同步给Shopify平台用以展示给真实下单用户查看物流轨迹。
遇到一个问题:
- 正常情况下的回调代码逻辑是将物流信息写入数据库,再同步物流数据给Shopify。但是由于各种原因后者(同步物流数据给Shopify)有一定概率会失败。
这样就出现了我系统内成功展示了物流信息而Shopify反馈没有成功同步物流轨迹的订单出现。而回调又是一次性的我只能自查数据库进行回补。
英雄登场(消息队列Redis)
官方介绍:消息队列中间件是大型系统中的重要组件,已经逐渐成为企业系统内部通信的核心手段。它具有松耦合、异步消息、流量削峰、可靠投递、广播、流量控制、最终一致性等一系列功能,已经成为异步RPC的主要手段之一。
Redis安装
我用的宝塔安装的方便快捷,软件商品搜索Redis然后点击系统对应php版本的立即前往

再后续弹窗中安装redis扩展即可

后续Redis中的队列数据也可以通过宝塔进行查看:

thinkphp/queue
扩展这个内置了 Redis、Database、Topthink、Sync四种驱动,这里我用的Redis。think-queue 队列消息可以进行任务的发布、获取、执行、删除、重新发布、延迟发布、超时控制等操作。
thinkphp/queue引入扩展
composer require topthink/think-queue
thinkphp/queue配置文件
我使用的是TP5要再application/extra目录下新增queue.php文件,文件内容如下(视各自情况调整哈):
return ['connector' => 'Redis', // 驱动类型'expire' => 60, // 任务的过期时间,默认为60秒; 如果任务执行时间超过此时间将会被认为是过期,将不会被执行'default' => 'default', // 默认的队列名称'host' => '127.0.0.1', // Redis 主机地址'port' => 6379, // Redis 端口'password' => '', // Redis 密码'select' => 0, // 使用哪一个 Redis 数据库'timeout' => 0, // 连接超时时间'persistent' => false, // 是否长连接
];
解决方案(注册部分)
引入消息队列后就是将原来串行方式改为并行,用户注册逻辑代码中关于后三个步骤只要单纯的推送队列即可。而后三者采用并行方式(也就是异步)执行对应的逻辑。这样既提高了注册的速度又可以通过队列将出错的数据多次执行提高成功率
注册逻辑代码
public static function doSaveRegister($postParam){db()->startTrans();try {$first_name = trim(outputstr($postParam, "first_name"));$last_name = trim(outputstr($postParam, "last_name"));$email = trim(outputstr($postParam, "email"));$password = trim(outputstr($postParam, "password"));//注册部分就展示部分代码了$info = new UserModel();$info->id = Uuid::uuid4();$info->number = createUserNumber();$info->short_name = substr($email, 0, strripos($email, "@"));$info->email = $email;if ($info->save() === false) {throw new Exception('Operation error!');}//发送用户注册成功的问候邮件,将要发送邮件的邮箱推送到消息队列$result = RedisUtils::redisQueueSendForSendRegisterWelcomeEmail('common\job\UserRegisterJob@sendRegisterSuccessWelcomeEmail', $email);if ($result['success'] === false) {throw new Exception('redis queue error!');}//这里只展示发送邮件的代码示例其他都是一样的道理db()->commit();$result = result_success('Register successful!');} catch (Exception $e) {db()->rollback();$result = result_error($e->getMessage());}return $result;}
推送发送邮件消息队列(生产者)
/*** 用户注册成功需要发送问候邮件的用户数据加入队列* @param string $job 处理该任务的任务名* @param string $data 加入队列的数据-邮箱号* @param string $queue_name 队列名,可以不写*/public static function redisQueueSendForSendRegisterWelcomeEmail($job, $data, $queue_name = 'user_register_email'){//此处做了延时推送,原因是邮件服务是自己写程序实现的避免高并发导致发送失败,所以延时推送一下$isPushed = Queue::later(5, $job, $data, $queue_name);if ($isPushed !== false) {$result = result_success('队列加入成功');} else {$result = result_error('队列加入失败');}return $result;}
消息队列处理逻辑(消费者)
<?phpnamespace common\job;use common\utils\email\EmailUtils;
use common\utils\gateway\GatewaysUtils;
use common\utils\log\LoggerUtils;
use common\utils\systemMessage\SystemMessageUtils;
use GuzzleHttp\Client;
use GuzzleHttp\Exception\ClientException;
use think\queue\Job;/*** 处理所有用户注册方面的队列数据,代码逻辑写在这里,运行方式是命令行执行的* Class UserRegister* @package common\job*/
class UserRegisterJob
{/*** 处理发送用户注册成功邮件的队列* @param Job $job* @param string $data 要发送邮件的邮箱*/public function sendRegisterSuccessWelcomeEmail(Job $job, $data){$result = EmailUtils::sendUserRegisterSuccessEmail($data);if ($result['success'] === false) {//判断一下发送失败的次数,超过3次剔除队列$attempts = $job->attempts();if ($attempts > 3) {//发送失败,写进日志,邮件通知开发者$message = '新用户注册发送问候邮件失败,程序错误内容:' . $result['msg'] . ',数据源:' . $data;LoggerUtils::systemErrorLog()->info($message);EmailUtils::sendSystemErrorEmailToDeveloper($message);$job->delete();}} else {//发送成功,剔除队列$job->delete();}}
}
启动队列监听
进入项目根目录执行
php think queue:work --queue 队列名1,队列名2
多个队列可以用逗号拼接一次性监听
这个进行一般都要后台运行且开机自启动,自己写的脚本如下:
#!/bin/bash
#启动Redis队列监听
cd /www/wwwroot/english-e-commerce/ && php think queue:listen --queue user_register_email,user_register_workman_message,sync_user_to_tidio,order_sync_mabang_track,fulfillment_shopify_order &
开机启动方法根据不同linux系统有很多种此处不做记录
不喜勿喷,也是初学。记录一下方便后面查找
参考链接
- ThinkPHP 使用 think-queue 实现 redis 消息队列(超详细)
- 消息队列使用的四种场景介绍
相关文章:
Redis消息队列与thinkphp/queue操作
业务场景 场景一 用户完成注册后需要发送欢迎注册的问候邮件、同时后台要发送实时消息给用户对应的业务员有新的客户注册、最后将用户的注册数据通过接口推送到一个营销用的第三方平台。 遇到两个问题: 由于代码是串行方式,流程大致为:开…...
【Ubuntu】常用命令
一般操作 pwd(present working directory) 显示当前的工作目录/路径。 cd (change directory) 改变目录,用于输入需要前往的路径/目录。 有一些特殊命令也很常用 : 解释 前往同一级的另一个目录 cd ../directory name cd .. 表示进入上…...
稀碎从零算法笔记Day22-LeetCode:
题型:链表 链接:2. 两数相加 - 力扣(LeetCode) 来源:Leet 题目描述 给你两个 非空 的链表,表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的,并且每个节点只能存储 一位 数字。 …...
Nacos下载和安装
(1)下载地址和版本 下载地址:Releases alibaba/nacos GitHub 解压在没有中文及空格的文件夹 (2)启动nacos服务 在bin目录下,打开命令行,输入 启动命令:sh startup.sh -m standalone - Linux/Unix/Mac …...
pandas简介(python)
pandas是什么 Pandas 是一个开源的第三方 Python 库,从 Numpy 和 Matplotlib 的基础上构建而来,享有数据分析“三剑客之一”的盛名(NumPy、Matplotlib、Pandas)。Pandas 已经成为 Python 数据分析的必备高级工具,它的…...
个人网站制作 Part 13 添加搜索功能[Elasticsearch] | Web开发项目
文章目录 👩💻 基础Web开发练手项目系列:个人网站制作🚀 添加搜索功能🔨使用Elasticsearch🔧步骤 1: 安装Elasticsearch🔧步骤 2: 配置Elasticsearch🔧步骤 3: 创建索引 …...
Springboot+vue的仓库管理系统(有报告)。Javaee项目,springboot vue前后端分离项目。
演示视频: Springbootvue的仓库管理系统(有报告)。Javaee项目,springboot vue前后端分离项目。 项目介绍: 采用M(model)V(view)C(controller)三层…...
vue3 + vite 实现一个动态路由加载功能
假设后端返回的格式是这样子 {"menu": [{"path": "/admin","name": "adminLayout","redirect": "/admin/index","componentPath": "/layout/admin/index.vue","children&quo…...
【征稿进行时|见刊、检索快速稳定】2024年区块链、物联网与复合材料与国际学术会议 (ICBITC 2024)
【征稿进行时|见刊、检索快速稳定】2024年区块链、物联网与复合材料与国际学术会议 (ICBITC 2024) 大会主题: (主题包括但不限于, 更多主题请咨询会务组苏老师) 区块链: 区块链技术和系统 分布式一致性算法和协议 块链性能 信息储存系统 区块链可扩展性 区块…...
若依jar包运行脚本,从零到一:用Bash脚本实现JAR应用的启动、停止与监控
脚本使用说明: 启动应用:sh app.sh start停止应用:sh app.sh stop检查应用状态:sh app.sh status重启应用:sh app.sh restart 注意事项: 请确保你的系统上安装了 Java 环境,并且 ruoyi-admin…...
Unix运维_FreeBSD-13.1临时环境变量设置(bin和include以及lib)
Unix运维_FreeBSD-13.1临时环境变量设置(bin和include以及lib) 在 FreeBSD 系统上设置用户环境变量可以通过编辑用户的 Shell配置文件 来实现。 cshrc 与 csh_profile 的区别: cshrc: 每个脚本执行前都执行一遍这个脚本。 csh_profile: 根据不同使用者用户名, 会先去其 home…...
Apache Dolphinscheduler - 无需重启 Master-Server 停止疯狂刷日志解决方案
记录的是一个 3.0 比较难搞的问题,相信不少使用过 3.0 的用户都遇到过 Master 服务中存在一些工作流或者任务流一直不停的死循环的问题,导致疯狂刷日志。不过本人到现在也没找到最关键的触发原因,只是看到一些连锁反应带来的结果…… 影响因素…...
竞争优势:大型语言模型 (LLM) 如何重新定义业务策略
人工智能在内容创作中的突破 在当今快节奏的商业环境中,像 GPT-4 这样的大型语言模型 (LLM) 不再只是一种技术新颖性; 它们已成为重新定义跨行业业务战略的基石。 从增强客户服务到推动创新,法学硕士提供了企业不容忽视的竞争优势。 1. 加强…...
Spring AOP和AspectJ AOP区别
Spring AOP(Aspect-Oriented Programming)和 AspectJ AOP 是两种不同的 AOP 实现方式,它们在实现上有一些区别。下面是它们之间的主要区别: 基于代理 vs 字节码增强: Spring AOP: Spring AOP 是基于代理的…...
FREERTOS信号量详解
信号量是操作系统中重要的一部分,信号量一般用来进行资源管理和任务同步,资源管理其实就是用变量来标记现有资源的数量,任务同步其实就是用标志位来控制任务的先后执行顺序,这些概念在操作系统中以及裸机开发中都有所涉及。 FreeR…...
每天学习一个Linux命令之vim
每天学习一个Linux命令之vim Vim是一款功能强大的文本编辑器,在Linux系统中广泛使用。本篇博客将介绍一些常用的Vim命令及其选项,帮助您更好地使用Vim进行文本编辑。 命令及选项 以下是Vim的常用命令及其可用选项: 1. 打开文件 $ vim fi…...
linux环境部署
war包环境 在Linux系统上部署准备war包环境 查看linux当前版本和系统类型 [rootlocalhost ~]# uname -a Linux localhost.localdomain 3.10.0-1160.el7.x86_64 #1 SMP Mon Oct 19 16:18:59 UTC 2020 x86_64 x86_64 x86_64 GNU/Linuxlinux 打包文件夹 使用tar命令࿱…...
上位机图像处理和嵌入式模块部署(qmacvisual图像预处理)
【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing 163.com】 不管大家是在读书的时候学习的图像处理,还是在后来的工作中,重新学习了图像处理,相信大家对图像预处理的概念并…...
C语言内存函数详解
文章目录 前言一、memcpy函数(内存拷贝函数)二、memmove重叠拷贝函数三.memset内存设置函数四.memcmp内存比较函数总结 前言 我们之前按学习了C语言标准库中提供了一系列的字符和字符串库函数,接下来我们就学习一下关于内存相关的一些函数。…...
详解Redis的持久化RDB和AOF
Redis的持久化是将内存中的数据同步到硬盘的过程 具体来说,Redis支持两种主要的持久化方式:RDB 和 AOF。 RDB(Redis Database) 简介 默认持久化方式 RDB会将内存中的数据快照保存到磁盘上的一个二进制文件中。这个文件包含了…...
JavaSec-RCE
简介 RCE(Remote Code Execution),可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景:Groovy代码注入 Groovy是一种基于JVM的动态语言,语法简洁,支持闭包、动态类型和Java互操作性,…...
【论文笔记】若干矿井粉尘检测算法概述
总的来说,传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度,通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...
【Go】3、Go语言进阶与依赖管理
前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课,做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程,它的核心机制是 Goroutine 协程、Channel 通道,并基于CSP(Communicating Sequential Processes࿰…...
Map相关知识
数据结构 二叉树 二叉树,顾名思义,每个节点最多有两个“叉”,也就是两个子节点,分别是左子 节点和右子节点。不过,二叉树并不要求每个节点都有两个子节点,有的节点只 有左子节点,有的节点只有…...
智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制
在数字化浪潮席卷全球的今天,数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具,在大规模数据获取中发挥着关键作用。然而,传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时,常出现数据质…...
分布式增量爬虫实现方案
之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面,避免重复抓取,以节省资源和时间。 在分布式环境下,增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路:将增量判…...
2023赣州旅游投资集团
单选题 1.“不登高山,不知天之高也;不临深溪,不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...
vulnyx Blogger writeup
信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面,gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress,说明目标所使用的cms是wordpress,访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...
JavaScript 数据类型详解
JavaScript 数据类型详解 JavaScript 数据类型分为 原始类型(Primitive) 和 对象类型(Object) 两大类,共 8 种(ES11): 一、原始类型(7种) 1. undefined 定…...
【学习笔记】erase 删除顺序迭代器后迭代器失效的解决方案
目录 使用 erase 返回值继续迭代使用索引进行遍历 我们知道类似 vector 的顺序迭代器被删除后,迭代器会失效,因为顺序迭代器在内存中是连续存储的,元素删除后,后续元素会前移。 但一些场景中,我们又需要在执行删除操作…...
