Redis消息队列与thinkphp/queue操作
业务场景
场景一
用户完成注册后需要发送欢迎注册的问候邮件、同时后台要发送实时消息给用户对应的业务员有新的客户注册、最后将用户的注册数据通过接口推送到一个营销用的第三方平台。
遇到两个问题:
- 由于代码是串行方式,流程大致为:开启数据库事务回滚->数据入库准备->发邮件->发实时消息->推送第三方平台->提交写入数据库。但是后续的3个步骤任意一个流程出了问题都会影响用户的注册结果。
- 发送邮件使用的不是成熟的第三方产品,而是利用
phpmaile自写代码实现的,然而这个过程耗时相对较长且偶尔有失败的情况;另外通过接口推送注册用户数据到的第三方平台是一个国外的产品接口通讯时间很长且一样有失败的情况。
以上两个问题就会导致用户的注册交互流程时间很长产品体验感非常差;且发送邮件、发送消息、推送数据任意一个步骤由于特殊情况导致执行失败都不能终止用户注册这样就只能通过日志捕获相应的失败情况。
场景二
用户在Shopify平台(一个跨境电商平台)付款下单后,商家会将订单同步到我的系统中,在我的系统中完成询价、报价、付款后我需要再将订单数据推送到第三方配货发货的平台。平台发货完成后通过设置好的回调地址通知我的系统发货的物流信息数据,我需要将物流信息数据存入到我的数据库后再将物流信息同步给Shopify平台用以展示给真实下单用户查看物流轨迹。
遇到一个问题:
- 正常情况下的回调代码逻辑是将物流信息写入数据库,再同步物流数据给Shopify。但是由于各种原因后者(同步物流数据给Shopify)有一定概率会失败。
这样就出现了我系统内成功展示了物流信息而Shopify反馈没有成功同步物流轨迹的订单出现。而回调又是一次性的我只能自查数据库进行回补。
英雄登场(消息队列Redis)
官方介绍:消息队列中间件是大型系统中的重要组件,已经逐渐成为企业系统内部通信的核心手段。它具有松耦合、异步消息、流量削峰、可靠投递、广播、流量控制、最终一致性等一系列功能,已经成为异步RPC的主要手段之一。
Redis安装
我用的宝塔安装的方便快捷,软件商品搜索Redis然后点击系统对应php版本的立即前往

再后续弹窗中安装redis扩展即可

后续Redis中的队列数据也可以通过宝塔进行查看:

thinkphp/queue
扩展这个内置了 Redis、Database、Topthink、Sync四种驱动,这里我用的Redis。think-queue 队列消息可以进行任务的发布、获取、执行、删除、重新发布、延迟发布、超时控制等操作。
thinkphp/queue引入扩展
composer require topthink/think-queue
thinkphp/queue配置文件
我使用的是TP5要再application/extra目录下新增queue.php文件,文件内容如下(视各自情况调整哈):
return ['connector' => 'Redis', // 驱动类型'expire' => 60, // 任务的过期时间,默认为60秒; 如果任务执行时间超过此时间将会被认为是过期,将不会被执行'default' => 'default', // 默认的队列名称'host' => '127.0.0.1', // Redis 主机地址'port' => 6379, // Redis 端口'password' => '', // Redis 密码'select' => 0, // 使用哪一个 Redis 数据库'timeout' => 0, // 连接超时时间'persistent' => false, // 是否长连接
];
解决方案(注册部分)
引入消息队列后就是将原来串行方式改为并行,用户注册逻辑代码中关于后三个步骤只要单纯的推送队列即可。而后三者采用并行方式(也就是异步)执行对应的逻辑。这样既提高了注册的速度又可以通过队列将出错的数据多次执行提高成功率
注册逻辑代码
public static function doSaveRegister($postParam){db()->startTrans();try {$first_name = trim(outputstr($postParam, "first_name"));$last_name = trim(outputstr($postParam, "last_name"));$email = trim(outputstr($postParam, "email"));$password = trim(outputstr($postParam, "password"));//注册部分就展示部分代码了$info = new UserModel();$info->id = Uuid::uuid4();$info->number = createUserNumber();$info->short_name = substr($email, 0, strripos($email, "@"));$info->email = $email;if ($info->save() === false) {throw new Exception('Operation error!');}//发送用户注册成功的问候邮件,将要发送邮件的邮箱推送到消息队列$result = RedisUtils::redisQueueSendForSendRegisterWelcomeEmail('common\job\UserRegisterJob@sendRegisterSuccessWelcomeEmail', $email);if ($result['success'] === false) {throw new Exception('redis queue error!');}//这里只展示发送邮件的代码示例其他都是一样的道理db()->commit();$result = result_success('Register successful!');} catch (Exception $e) {db()->rollback();$result = result_error($e->getMessage());}return $result;}
推送发送邮件消息队列(生产者)
/*** 用户注册成功需要发送问候邮件的用户数据加入队列* @param string $job 处理该任务的任务名* @param string $data 加入队列的数据-邮箱号* @param string $queue_name 队列名,可以不写*/public static function redisQueueSendForSendRegisterWelcomeEmail($job, $data, $queue_name = 'user_register_email'){//此处做了延时推送,原因是邮件服务是自己写程序实现的避免高并发导致发送失败,所以延时推送一下$isPushed = Queue::later(5, $job, $data, $queue_name);if ($isPushed !== false) {$result = result_success('队列加入成功');} else {$result = result_error('队列加入失败');}return $result;}
消息队列处理逻辑(消费者)
<?phpnamespace common\job;use common\utils\email\EmailUtils;
use common\utils\gateway\GatewaysUtils;
use common\utils\log\LoggerUtils;
use common\utils\systemMessage\SystemMessageUtils;
use GuzzleHttp\Client;
use GuzzleHttp\Exception\ClientException;
use think\queue\Job;/*** 处理所有用户注册方面的队列数据,代码逻辑写在这里,运行方式是命令行执行的* Class UserRegister* @package common\job*/
class UserRegisterJob
{/*** 处理发送用户注册成功邮件的队列* @param Job $job* @param string $data 要发送邮件的邮箱*/public function sendRegisterSuccessWelcomeEmail(Job $job, $data){$result = EmailUtils::sendUserRegisterSuccessEmail($data);if ($result['success'] === false) {//判断一下发送失败的次数,超过3次剔除队列$attempts = $job->attempts();if ($attempts > 3) {//发送失败,写进日志,邮件通知开发者$message = '新用户注册发送问候邮件失败,程序错误内容:' . $result['msg'] . ',数据源:' . $data;LoggerUtils::systemErrorLog()->info($message);EmailUtils::sendSystemErrorEmailToDeveloper($message);$job->delete();}} else {//发送成功,剔除队列$job->delete();}}
}
启动队列监听
进入项目根目录执行
php think queue:work --queue 队列名1,队列名2
多个队列可以用逗号拼接一次性监听
这个进行一般都要后台运行且开机自启动,自己写的脚本如下:
#!/bin/bash
#启动Redis队列监听
cd /www/wwwroot/english-e-commerce/ && php think queue:listen --queue user_register_email,user_register_workman_message,sync_user_to_tidio,order_sync_mabang_track,fulfillment_shopify_order &
开机启动方法根据不同linux系统有很多种此处不做记录
不喜勿喷,也是初学。记录一下方便后面查找
参考链接
- ThinkPHP 使用 think-queue 实现 redis 消息队列(超详细)
- 消息队列使用的四种场景介绍
相关文章:
Redis消息队列与thinkphp/queue操作
业务场景 场景一 用户完成注册后需要发送欢迎注册的问候邮件、同时后台要发送实时消息给用户对应的业务员有新的客户注册、最后将用户的注册数据通过接口推送到一个营销用的第三方平台。 遇到两个问题: 由于代码是串行方式,流程大致为:开…...
【Ubuntu】常用命令
一般操作 pwd(present working directory) 显示当前的工作目录/路径。 cd (change directory) 改变目录,用于输入需要前往的路径/目录。 有一些特殊命令也很常用 : 解释 前往同一级的另一个目录 cd ../directory name cd .. 表示进入上…...
稀碎从零算法笔记Day22-LeetCode:
题型:链表 链接:2. 两数相加 - 力扣(LeetCode) 来源:Leet 题目描述 给你两个 非空 的链表,表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的,并且每个节点只能存储 一位 数字。 …...
Nacos下载和安装
(1)下载地址和版本 下载地址:Releases alibaba/nacos GitHub 解压在没有中文及空格的文件夹 (2)启动nacos服务 在bin目录下,打开命令行,输入 启动命令:sh startup.sh -m standalone - Linux/Unix/Mac …...
pandas简介(python)
pandas是什么 Pandas 是一个开源的第三方 Python 库,从 Numpy 和 Matplotlib 的基础上构建而来,享有数据分析“三剑客之一”的盛名(NumPy、Matplotlib、Pandas)。Pandas 已经成为 Python 数据分析的必备高级工具,它的…...
个人网站制作 Part 13 添加搜索功能[Elasticsearch] | Web开发项目
文章目录 👩💻 基础Web开发练手项目系列:个人网站制作🚀 添加搜索功能🔨使用Elasticsearch🔧步骤 1: 安装Elasticsearch🔧步骤 2: 配置Elasticsearch🔧步骤 3: 创建索引 …...
Springboot+vue的仓库管理系统(有报告)。Javaee项目,springboot vue前后端分离项目。
演示视频: Springbootvue的仓库管理系统(有报告)。Javaee项目,springboot vue前后端分离项目。 项目介绍: 采用M(model)V(view)C(controller)三层…...
vue3 + vite 实现一个动态路由加载功能
假设后端返回的格式是这样子 {"menu": [{"path": "/admin","name": "adminLayout","redirect": "/admin/index","componentPath": "/layout/admin/index.vue","children&quo…...
【征稿进行时|见刊、检索快速稳定】2024年区块链、物联网与复合材料与国际学术会议 (ICBITC 2024)
【征稿进行时|见刊、检索快速稳定】2024年区块链、物联网与复合材料与国际学术会议 (ICBITC 2024) 大会主题: (主题包括但不限于, 更多主题请咨询会务组苏老师) 区块链: 区块链技术和系统 分布式一致性算法和协议 块链性能 信息储存系统 区块链可扩展性 区块…...
若依jar包运行脚本,从零到一:用Bash脚本实现JAR应用的启动、停止与监控
脚本使用说明: 启动应用:sh app.sh start停止应用:sh app.sh stop检查应用状态:sh app.sh status重启应用:sh app.sh restart 注意事项: 请确保你的系统上安装了 Java 环境,并且 ruoyi-admin…...
Unix运维_FreeBSD-13.1临时环境变量设置(bin和include以及lib)
Unix运维_FreeBSD-13.1临时环境变量设置(bin和include以及lib) 在 FreeBSD 系统上设置用户环境变量可以通过编辑用户的 Shell配置文件 来实现。 cshrc 与 csh_profile 的区别: cshrc: 每个脚本执行前都执行一遍这个脚本。 csh_profile: 根据不同使用者用户名, 会先去其 home…...
Apache Dolphinscheduler - 无需重启 Master-Server 停止疯狂刷日志解决方案
记录的是一个 3.0 比较难搞的问题,相信不少使用过 3.0 的用户都遇到过 Master 服务中存在一些工作流或者任务流一直不停的死循环的问题,导致疯狂刷日志。不过本人到现在也没找到最关键的触发原因,只是看到一些连锁反应带来的结果…… 影响因素…...
竞争优势:大型语言模型 (LLM) 如何重新定义业务策略
人工智能在内容创作中的突破 在当今快节奏的商业环境中,像 GPT-4 这样的大型语言模型 (LLM) 不再只是一种技术新颖性; 它们已成为重新定义跨行业业务战略的基石。 从增强客户服务到推动创新,法学硕士提供了企业不容忽视的竞争优势。 1. 加强…...
Spring AOP和AspectJ AOP区别
Spring AOP(Aspect-Oriented Programming)和 AspectJ AOP 是两种不同的 AOP 实现方式,它们在实现上有一些区别。下面是它们之间的主要区别: 基于代理 vs 字节码增强: Spring AOP: Spring AOP 是基于代理的…...
FREERTOS信号量详解
信号量是操作系统中重要的一部分,信号量一般用来进行资源管理和任务同步,资源管理其实就是用变量来标记现有资源的数量,任务同步其实就是用标志位来控制任务的先后执行顺序,这些概念在操作系统中以及裸机开发中都有所涉及。 FreeR…...
每天学习一个Linux命令之vim
每天学习一个Linux命令之vim Vim是一款功能强大的文本编辑器,在Linux系统中广泛使用。本篇博客将介绍一些常用的Vim命令及其选项,帮助您更好地使用Vim进行文本编辑。 命令及选项 以下是Vim的常用命令及其可用选项: 1. 打开文件 $ vim fi…...
linux环境部署
war包环境 在Linux系统上部署准备war包环境 查看linux当前版本和系统类型 [rootlocalhost ~]# uname -a Linux localhost.localdomain 3.10.0-1160.el7.x86_64 #1 SMP Mon Oct 19 16:18:59 UTC 2020 x86_64 x86_64 x86_64 GNU/Linuxlinux 打包文件夹 使用tar命令࿱…...
上位机图像处理和嵌入式模块部署(qmacvisual图像预处理)
【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing 163.com】 不管大家是在读书的时候学习的图像处理,还是在后来的工作中,重新学习了图像处理,相信大家对图像预处理的概念并…...
C语言内存函数详解
文章目录 前言一、memcpy函数(内存拷贝函数)二、memmove重叠拷贝函数三.memset内存设置函数四.memcmp内存比较函数总结 前言 我们之前按学习了C语言标准库中提供了一系列的字符和字符串库函数,接下来我们就学习一下关于内存相关的一些函数。…...
详解Redis的持久化RDB和AOF
Redis的持久化是将内存中的数据同步到硬盘的过程 具体来说,Redis支持两种主要的持久化方式:RDB 和 AOF。 RDB(Redis Database) 简介 默认持久化方式 RDB会将内存中的数据快照保存到磁盘上的一个二进制文件中。这个文件包含了…...
【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密
在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...
Linux简单的操作
ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
多种风格导航菜单 HTML 实现(附源码)
下面我将为您展示 6 种不同风格的导航菜单实现,每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...
高防服务器能够抵御哪些网络攻击呢?
高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...
pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)
目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关࿰…...
Redis数据倾斜问题解决
Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中,部分节点存储的数据量或访问量远高于其他节点,导致这些节点负载过高,影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...
如何在最短时间内提升打ctf(web)的水平?
刚刚刷完2遍 bugku 的 web 题,前来答题。 每个人对刷题理解是不同,有的人是看了writeup就等于刷了,有的人是收藏了writeup就等于刷了,有的人是跟着writeup做了一遍就等于刷了,还有的人是独立思考做了一遍就等于刷了。…...
使用 SymPy 进行向量和矩阵的高级操作
在科学计算和工程领域,向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能,能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作,并通过具体…...
QT3D学习笔记——圆台、圆锥
类名作用Qt3DWindow3D渲染窗口容器QEntity场景中的实体(对象或容器)QCamera控制观察视角QPointLight点光源QConeMesh圆锥几何网格QTransform控制实体的位置/旋转/缩放QPhongMaterialPhong光照材质(定义颜色、反光等)QFirstPersonC…...
