Redis消息队列与thinkphp/queue操作
业务场景
场景一
用户完成注册后需要发送欢迎注册的问候邮件、同时后台要发送实时消息给用户对应的业务员有新的客户注册、最后将用户的注册数据通过接口推送到一个营销用的第三方平台。
遇到两个问题:
- 由于代码是串行方式,流程大致为:开启数据库事务回滚->数据入库准备->发邮件->发实时消息->推送第三方平台->提交写入数据库。但是后续的3个步骤任意一个流程出了问题都会影响用户的注册结果。
- 发送邮件使用的不是成熟的第三方产品,而是利用
phpmaile自写代码实现的,然而这个过程耗时相对较长且偶尔有失败的情况;另外通过接口推送注册用户数据到的第三方平台是一个国外的产品接口通讯时间很长且一样有失败的情况。
以上两个问题就会导致用户的注册交互流程时间很长产品体验感非常差;且发送邮件、发送消息、推送数据任意一个步骤由于特殊情况导致执行失败都不能终止用户注册这样就只能通过日志捕获相应的失败情况。
场景二
用户在Shopify平台(一个跨境电商平台)付款下单后,商家会将订单同步到我的系统中,在我的系统中完成询价、报价、付款后我需要再将订单数据推送到第三方配货发货的平台。平台发货完成后通过设置好的回调地址通知我的系统发货的物流信息数据,我需要将物流信息数据存入到我的数据库后再将物流信息同步给Shopify平台用以展示给真实下单用户查看物流轨迹。
遇到一个问题:
- 正常情况下的回调代码逻辑是将物流信息写入数据库,再同步物流数据给Shopify。但是由于各种原因后者(同步物流数据给Shopify)有一定概率会失败。
这样就出现了我系统内成功展示了物流信息而Shopify反馈没有成功同步物流轨迹的订单出现。而回调又是一次性的我只能自查数据库进行回补。
英雄登场(消息队列Redis)
官方介绍:消息队列中间件是大型系统中的重要组件,已经逐渐成为企业系统内部通信的核心手段。它具有松耦合、异步消息、流量削峰、可靠投递、广播、流量控制、最终一致性等一系列功能,已经成为异步RPC的主要手段之一。
Redis安装
我用的宝塔安装的方便快捷,软件商品搜索Redis然后点击系统对应php版本的立即前往

再后续弹窗中安装redis扩展即可

后续Redis中的队列数据也可以通过宝塔进行查看:

thinkphp/queue
扩展这个内置了 Redis、Database、Topthink、Sync四种驱动,这里我用的Redis。think-queue 队列消息可以进行任务的发布、获取、执行、删除、重新发布、延迟发布、超时控制等操作。
thinkphp/queue引入扩展
composer require topthink/think-queue
thinkphp/queue配置文件
我使用的是TP5要再application/extra目录下新增queue.php文件,文件内容如下(视各自情况调整哈):
return ['connector' => 'Redis', // 驱动类型'expire' => 60, // 任务的过期时间,默认为60秒; 如果任务执行时间超过此时间将会被认为是过期,将不会被执行'default' => 'default', // 默认的队列名称'host' => '127.0.0.1', // Redis 主机地址'port' => 6379, // Redis 端口'password' => '', // Redis 密码'select' => 0, // 使用哪一个 Redis 数据库'timeout' => 0, // 连接超时时间'persistent' => false, // 是否长连接
];
解决方案(注册部分)
引入消息队列后就是将原来串行方式改为并行,用户注册逻辑代码中关于后三个步骤只要单纯的推送队列即可。而后三者采用并行方式(也就是异步)执行对应的逻辑。这样既提高了注册的速度又可以通过队列将出错的数据多次执行提高成功率
注册逻辑代码
public static function doSaveRegister($postParam){db()->startTrans();try {$first_name = trim(outputstr($postParam, "first_name"));$last_name = trim(outputstr($postParam, "last_name"));$email = trim(outputstr($postParam, "email"));$password = trim(outputstr($postParam, "password"));//注册部分就展示部分代码了$info = new UserModel();$info->id = Uuid::uuid4();$info->number = createUserNumber();$info->short_name = substr($email, 0, strripos($email, "@"));$info->email = $email;if ($info->save() === false) {throw new Exception('Operation error!');}//发送用户注册成功的问候邮件,将要发送邮件的邮箱推送到消息队列$result = RedisUtils::redisQueueSendForSendRegisterWelcomeEmail('common\job\UserRegisterJob@sendRegisterSuccessWelcomeEmail', $email);if ($result['success'] === false) {throw new Exception('redis queue error!');}//这里只展示发送邮件的代码示例其他都是一样的道理db()->commit();$result = result_success('Register successful!');} catch (Exception $e) {db()->rollback();$result = result_error($e->getMessage());}return $result;}
推送发送邮件消息队列(生产者)
/*** 用户注册成功需要发送问候邮件的用户数据加入队列* @param string $job 处理该任务的任务名* @param string $data 加入队列的数据-邮箱号* @param string $queue_name 队列名,可以不写*/public static function redisQueueSendForSendRegisterWelcomeEmail($job, $data, $queue_name = 'user_register_email'){//此处做了延时推送,原因是邮件服务是自己写程序实现的避免高并发导致发送失败,所以延时推送一下$isPushed = Queue::later(5, $job, $data, $queue_name);if ($isPushed !== false) {$result = result_success('队列加入成功');} else {$result = result_error('队列加入失败');}return $result;}
消息队列处理逻辑(消费者)
<?phpnamespace common\job;use common\utils\email\EmailUtils;
use common\utils\gateway\GatewaysUtils;
use common\utils\log\LoggerUtils;
use common\utils\systemMessage\SystemMessageUtils;
use GuzzleHttp\Client;
use GuzzleHttp\Exception\ClientException;
use think\queue\Job;/*** 处理所有用户注册方面的队列数据,代码逻辑写在这里,运行方式是命令行执行的* Class UserRegister* @package common\job*/
class UserRegisterJob
{/*** 处理发送用户注册成功邮件的队列* @param Job $job* @param string $data 要发送邮件的邮箱*/public function sendRegisterSuccessWelcomeEmail(Job $job, $data){$result = EmailUtils::sendUserRegisterSuccessEmail($data);if ($result['success'] === false) {//判断一下发送失败的次数,超过3次剔除队列$attempts = $job->attempts();if ($attempts > 3) {//发送失败,写进日志,邮件通知开发者$message = '新用户注册发送问候邮件失败,程序错误内容:' . $result['msg'] . ',数据源:' . $data;LoggerUtils::systemErrorLog()->info($message);EmailUtils::sendSystemErrorEmailToDeveloper($message);$job->delete();}} else {//发送成功,剔除队列$job->delete();}}
}
启动队列监听
进入项目根目录执行
php think queue:work --queue 队列名1,队列名2
多个队列可以用逗号拼接一次性监听
这个进行一般都要后台运行且开机自启动,自己写的脚本如下:
#!/bin/bash
#启动Redis队列监听
cd /www/wwwroot/english-e-commerce/ && php think queue:listen --queue user_register_email,user_register_workman_message,sync_user_to_tidio,order_sync_mabang_track,fulfillment_shopify_order &
开机启动方法根据不同linux系统有很多种此处不做记录
不喜勿喷,也是初学。记录一下方便后面查找
参考链接
- ThinkPHP 使用 think-queue 实现 redis 消息队列(超详细)
- 消息队列使用的四种场景介绍
相关文章:
Redis消息队列与thinkphp/queue操作
业务场景 场景一 用户完成注册后需要发送欢迎注册的问候邮件、同时后台要发送实时消息给用户对应的业务员有新的客户注册、最后将用户的注册数据通过接口推送到一个营销用的第三方平台。 遇到两个问题: 由于代码是串行方式,流程大致为:开…...
【Ubuntu】常用命令
一般操作 pwd(present working directory) 显示当前的工作目录/路径。 cd (change directory) 改变目录,用于输入需要前往的路径/目录。 有一些特殊命令也很常用 : 解释 前往同一级的另一个目录 cd ../directory name cd .. 表示进入上…...
稀碎从零算法笔记Day22-LeetCode:
题型:链表 链接:2. 两数相加 - 力扣(LeetCode) 来源:Leet 题目描述 给你两个 非空 的链表,表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的,并且每个节点只能存储 一位 数字。 …...
Nacos下载和安装
(1)下载地址和版本 下载地址:Releases alibaba/nacos GitHub 解压在没有中文及空格的文件夹 (2)启动nacos服务 在bin目录下,打开命令行,输入 启动命令:sh startup.sh -m standalone - Linux/Unix/Mac …...
pandas简介(python)
pandas是什么 Pandas 是一个开源的第三方 Python 库,从 Numpy 和 Matplotlib 的基础上构建而来,享有数据分析“三剑客之一”的盛名(NumPy、Matplotlib、Pandas)。Pandas 已经成为 Python 数据分析的必备高级工具,它的…...
个人网站制作 Part 13 添加搜索功能[Elasticsearch] | Web开发项目
文章目录 👩💻 基础Web开发练手项目系列:个人网站制作🚀 添加搜索功能🔨使用Elasticsearch🔧步骤 1: 安装Elasticsearch🔧步骤 2: 配置Elasticsearch🔧步骤 3: 创建索引 …...
Springboot+vue的仓库管理系统(有报告)。Javaee项目,springboot vue前后端分离项目。
演示视频: Springbootvue的仓库管理系统(有报告)。Javaee项目,springboot vue前后端分离项目。 项目介绍: 采用M(model)V(view)C(controller)三层…...
vue3 + vite 实现一个动态路由加载功能
假设后端返回的格式是这样子 {"menu": [{"path": "/admin","name": "adminLayout","redirect": "/admin/index","componentPath": "/layout/admin/index.vue","children&quo…...
【征稿进行时|见刊、检索快速稳定】2024年区块链、物联网与复合材料与国际学术会议 (ICBITC 2024)
【征稿进行时|见刊、检索快速稳定】2024年区块链、物联网与复合材料与国际学术会议 (ICBITC 2024) 大会主题: (主题包括但不限于, 更多主题请咨询会务组苏老师) 区块链: 区块链技术和系统 分布式一致性算法和协议 块链性能 信息储存系统 区块链可扩展性 区块…...
若依jar包运行脚本,从零到一:用Bash脚本实现JAR应用的启动、停止与监控
脚本使用说明: 启动应用:sh app.sh start停止应用:sh app.sh stop检查应用状态:sh app.sh status重启应用:sh app.sh restart 注意事项: 请确保你的系统上安装了 Java 环境,并且 ruoyi-admin…...
Unix运维_FreeBSD-13.1临时环境变量设置(bin和include以及lib)
Unix运维_FreeBSD-13.1临时环境变量设置(bin和include以及lib) 在 FreeBSD 系统上设置用户环境变量可以通过编辑用户的 Shell配置文件 来实现。 cshrc 与 csh_profile 的区别: cshrc: 每个脚本执行前都执行一遍这个脚本。 csh_profile: 根据不同使用者用户名, 会先去其 home…...
Apache Dolphinscheduler - 无需重启 Master-Server 停止疯狂刷日志解决方案
记录的是一个 3.0 比较难搞的问题,相信不少使用过 3.0 的用户都遇到过 Master 服务中存在一些工作流或者任务流一直不停的死循环的问题,导致疯狂刷日志。不过本人到现在也没找到最关键的触发原因,只是看到一些连锁反应带来的结果…… 影响因素…...
竞争优势:大型语言模型 (LLM) 如何重新定义业务策略
人工智能在内容创作中的突破 在当今快节奏的商业环境中,像 GPT-4 这样的大型语言模型 (LLM) 不再只是一种技术新颖性; 它们已成为重新定义跨行业业务战略的基石。 从增强客户服务到推动创新,法学硕士提供了企业不容忽视的竞争优势。 1. 加强…...
Spring AOP和AspectJ AOP区别
Spring AOP(Aspect-Oriented Programming)和 AspectJ AOP 是两种不同的 AOP 实现方式,它们在实现上有一些区别。下面是它们之间的主要区别: 基于代理 vs 字节码增强: Spring AOP: Spring AOP 是基于代理的…...
FREERTOS信号量详解
信号量是操作系统中重要的一部分,信号量一般用来进行资源管理和任务同步,资源管理其实就是用变量来标记现有资源的数量,任务同步其实就是用标志位来控制任务的先后执行顺序,这些概念在操作系统中以及裸机开发中都有所涉及。 FreeR…...
每天学习一个Linux命令之vim
每天学习一个Linux命令之vim Vim是一款功能强大的文本编辑器,在Linux系统中广泛使用。本篇博客将介绍一些常用的Vim命令及其选项,帮助您更好地使用Vim进行文本编辑。 命令及选项 以下是Vim的常用命令及其可用选项: 1. 打开文件 $ vim fi…...
linux环境部署
war包环境 在Linux系统上部署准备war包环境 查看linux当前版本和系统类型 [rootlocalhost ~]# uname -a Linux localhost.localdomain 3.10.0-1160.el7.x86_64 #1 SMP Mon Oct 19 16:18:59 UTC 2020 x86_64 x86_64 x86_64 GNU/Linuxlinux 打包文件夹 使用tar命令࿱…...
上位机图像处理和嵌入式模块部署(qmacvisual图像预处理)
【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing 163.com】 不管大家是在读书的时候学习的图像处理,还是在后来的工作中,重新学习了图像处理,相信大家对图像预处理的概念并…...
C语言内存函数详解
文章目录 前言一、memcpy函数(内存拷贝函数)二、memmove重叠拷贝函数三.memset内存设置函数四.memcmp内存比较函数总结 前言 我们之前按学习了C语言标准库中提供了一系列的字符和字符串库函数,接下来我们就学习一下关于内存相关的一些函数。…...
详解Redis的持久化RDB和AOF
Redis的持久化是将内存中的数据同步到硬盘的过程 具体来说,Redis支持两种主要的持久化方式:RDB 和 AOF。 RDB(Redis Database) 简介 默认持久化方式 RDB会将内存中的数据快照保存到磁盘上的一个二进制文件中。这个文件包含了…...
利用最小二乘法找圆心和半径
#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …...
【kafka】Golang实现分布式Masscan任务调度系统
要求: 输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。 服务端程序: 从kafka消费者接收…...
java_网络服务相关_gateway_nacos_feign区别联系
1. spring-cloud-starter-gateway 作用:作为微服务架构的网关,统一入口,处理所有外部请求。 核心能力: 路由转发(基于路径、服务名等)过滤器(鉴权、限流、日志、Header 处理)支持负…...
label-studio的使用教程(导入本地路径)
文章目录 1. 准备环境2. 脚本启动2.1 Windows2.2 Linux 3. 安装label-studio机器学习后端3.1 pip安装(推荐)3.2 GitHub仓库安装 4. 后端配置4.1 yolo环境4.2 引入后端模型4.3 修改脚本4.4 启动后端 5. 标注工程5.1 创建工程5.2 配置图片路径5.3 配置工程类型标签5.4 配置模型5.…...
java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别
UnsatisfiedLinkError 在对接硬件设备中,我们会遇到使用 java 调用 dll文件 的情况,此时大概率出现UnsatisfiedLinkError链接错误,原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用,结果 dll 未实现 JNI 协…...
如何将联系人从 iPhone 转移到 Android
从 iPhone 换到 Android 手机时,你可能需要保留重要的数据,例如通讯录。好在,将通讯录从 iPhone 转移到 Android 手机非常简单,你可以从本文中学习 6 种可靠的方法,确保随时保持连接,不错过任何信息。 第 1…...
【论文阅读28】-CNN-BiLSTM-Attention-(2024)
本文把滑坡位移序列拆开、筛优质因子,再用 CNN-BiLSTM-Attention 来动态预测每个子序列,最后重构出总位移,预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵(S…...
全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
Qemu arm操作系统开发环境
使用qemu虚拟arm硬件比较合适。 步骤如下: 安装qemu apt install qemu-system安装aarch64-none-elf-gcc 需要手动下载,下载地址:https://developer.arm.com/-/media/Files/downloads/gnu/13.2.rel1/binrel/arm-gnu-toolchain-13.2.rel1-x…...
SQL Server 触发器调用存储过程实现发送 HTTP 请求
文章目录 需求分析解决第 1 步:前置条件,启用 OLE 自动化方式 1:使用 SQL 实现启用 OLE 自动化方式 2:Sql Server 2005启动OLE自动化方式 3:Sql Server 2008启动OLE自动化第 2 步:创建存储过程第 3 步:创建触发器扩展 - 如何调试?第 1 步:登录 SQL Server 2008第 2 步…...
