基于Redis的Stream结构作为消息队列,实现异步秒杀下单
文章目录
- 1 认识消息队列
- 2 基于List实现消息队列
- 3 基于PubSub的消息队列
- 4 基于Stream的消息队列
- 5 基于Stream的消息队列-消费者组
- 6 基于Redis的Stream结构作为消息队列,实现异步秒杀下单
1 认识消息队列
什么是消息队列:字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息

消息队列是一种常用的中间件,它可以用来实现系统解耦、流量削峰和异步调用等功能。
所谓解耦,举一个生活中的例子就是:快递员(生产者)把快递放到快递柜里边(Message Queue)去,我们(消费者)从快递柜里边去拿东西,这就是一个异步,如果耦合,那么这个快递员相当于直接把快递交给你,这事固然好,但是万一你不在家,那么快递员就会一直等你,这就浪费了快递员的时间,所以这种思想在我们日常开发中,是非常有必要的。
这种场景在我们秒杀中就变成了:我们下单之后,利用redis去进行校验下单条件,再通过队列把消息发送出去,然后再启动一个线程去消费这个消息,完成解耦,同时也加快我们的响应速度。
常见的消息队列中间件有RabbitMQ、Kafka和RocketMQ等。这些消息队列需要独立安装部署,虽然它们具有高性能和高可靠性的优点,但是额外部署这些中间件也会增加运维成本和服务器成本。
Redis也可以用来实现消息队列。Redis提供了几种不同的方式来实现消息队列,包括使用List、ZSet、PubSub模式和Stream等。这些方式各有优劣,适用于不同的应用场景。
- 使用List实现消息队列:这种方式最为简单直接,它主要通过lpush和rpop命令来存入和读取消息。使用List实现消息队列的优点是消息可以被持久化,但缺点是不支持重复消费、没有按照主题订阅的功能、不支持消费者确认机制等。
- 使用ZSet实现消息队列:这种方式与使用List类似,但由于ZSet多了一个分值(score)属性,我们可以使用它来实现更多的功能,比如用它来存储时间戳,以此来实现延迟消息队列等。ZSet同样具备持久化的功能,但也存在与List类似的问题。
- 使用发布订阅模式实现消息队列:这种方式可以实现主题订阅的功能。但缺点是无法持久化保存信息,如果Redis服务器宕机或者重启,那么所有的消息都会丢失;发布订阅模式是“发后即忘”的工作模式,如果都订阅者离线重连之后就不能消费之前的历史消息了;不支持消费者确认机制,稳定性不能得到保证。
- 使用Stream实现消息队列:这种方式是在Redis 5.0之后新增的。我们可以使用Stream的xadd和xrange命令来实现消息的存入和读取,并且Stream提供了xack命令来手动确认消息消费,用它我们就可以实现消费者确认的功能。
2 基于List实现消息队列
基于List结构模拟消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

基于List的消息队列有哪些优缺点?
优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失
- 只支持单消费者
3 基于PubSub的消息队列
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg :向一个频道发送消息
PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道

基于PubSub的消息队列有哪些优缺点?
优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
4 基于Stream的消息队列
Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
发送消息的命令:

例如:

读取消息的方式之一:XREAD

例如,使用XREAD读取第一个消息:

XREAD阻塞方式,读取最新的消息:

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下

注意:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题
STREAM类型消息队列的XREAD命令特点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
5 基于Stream的消息队列-消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

创建消费者组:

key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列
其它常见命令:
删除指定的消费者组
XGROUP DESTORY key groupName
给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername
删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
从消费者组读取消息:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- group:消费组名称
- consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
- count:本次查询的最大数量
- BLOCK milliseconds:当没有消息时最长等待时间
- NOACK:无需手动ACK,获取到消息后自动确认
- STREAMS key:指定队列名称
- ID:获取消息的起始ID:
>:从下一个未消费的消息开始
其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
消费者监听消息的基本思路:

STREAM类型消息队列的XREADGROUP命令特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
最后我们来个小对比

6 基于Redis的Stream结构作为消息队列,实现异步秒杀下单
需求:
- 创建一个Stream类型的消息队列,名为stream.orders
- 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
- 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
修改lua表达式,新增3.6

VoucherOrderServiceImpl
private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));// 2.判断订单信息是否为空if (list == null || list.isEmpty()) {// 如果为null,说明没有消息,继续下一次循环continue;}// 解析数据MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.创建订单createVoucherOrder(voucherOrder);// 4.确认消息 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("处理订单异常", e);//处理异常消息handlePendingList();}}}private void handlePendingList() {while (true) {try {// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create("stream.orders", ReadOffset.from("0")));// 2.判断订单信息是否为空if (list == null || list.isEmpty()) {// 如果为null,说明没有异常消息,结束循环break;}// 解析数据MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.创建订单createVoucherOrder(voucherOrder);// 4.确认消息 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("处理pendding订单异常", e);try{Thread.sleep(20);}catch(Exception e){e.printStackTrace();}}}}
}相关文章:
基于Redis的Stream结构作为消息队列,实现异步秒杀下单
文章目录 1 认识消息队列2 基于List实现消息队列3 基于PubSub的消息队列4 基于Stream的消息队列5 基于Stream的消息队列-消费者组6 基于Redis的Stream结构作为消息队列,实现异步秒杀下单 1 认识消息队列 什么是消息队列:字面意思就是存放消息的队列。最…...
ePWM模块-时基模块(2)
ePWM模块(2) 时基模块的使用 TBPRD:周期寄存器 (设置的时钟周期存入此,可通过阴影寄存器缓冲后写入,也可通过活动寄存器立即写入) TBCTR:时基计数变值寄存器 (时基当前所计数的值存入,用于和所设定周期值比较) TBPHS:时基相位寄存器 TBSTS:时基状态寄存器 …...
让GPT对话写小说
## 来自你的消息: 写个小说 ## 来自ChatAI的消息: 很好,我可以为您编写一个小故事。以下是一个短小精干的故事: 《夜行者》 夜,是个寂静而神秘的时间,很多人都选择睡眠。但在这个城市,有一群人——夜行者,他…...
Docker 应用部署-MySQL
一、安装MySQL 1搜索mysql镜像 docker search mysql 2拉取mysql镜像 docker pull mysql:8.0.20 3创建容器 通过下面的命令,创建容器并设置端口映射、目录映射 #在用户名目录下创建mysql目录用于存储mysql数据信息 mkdir /home/mysql cd /home/mysql #创建docker容…...
电容笔哪个厂家的产品比较好?苹果平板的电容笔推荐
从目前来说,这个苹果的正版电容笔,售价真的是太贵了,一支就要接近上千元。事实上,对于那些没有很多预算的人来说,平替电容笔是一个很好的选择。一支苹果电容笔,价格是四支平替电容笔的四倍,但平…...
今年的面试难度有点大....
大家好,最近有不少小伙伴在后台留言,又得准备面试了,不知道从何下手! 不论是跳槽涨薪,还是学习提升!先给自己定一个小目标,然后再朝着目标去努力就完事儿了! 为了帮大家节约时间&a…...
【PWN · ret2libc】ret2libc2
ret2libc1的略微进阶——存在systemplt但是不存在“/bin/sh”怎么办? 目录 前言 python3 ELF 查看文件信息 strings 查看寻找"/bin/sh" IDA反汇编分析 思路及实现 老规矩,偏移量 offset EXP编写 总结 前言 经过ret2libc1的洗礼&a…...
深度学习01-tensorflow开发环境搭建
文章目录 简介运行硬件cuda和cuddntensorflow安装。tensorflow版本安装Anaconda创建python环境安装tensorflow-gpupycharm配置配置conda环境配置juypternotebook 安装cuda安装cudnn安装blas 云服务器运行云服务器选择pycharm配置代码自动同步远程interpreter 简介 TensorFlow是…...
linux相关操作
1 系统调用 通过strace直接看程序运行过程中的系统调用情况 其中每一行为一个systemcall ,调用write系统调用将内容最终输出。 无论什么编程语言都必须通过系统调用向内核发起请求。 sar查看进程分别在用户模式和内核模式下的运行时间占比情况, ALL显…...
PMP项目管理-[第十章]沟通管理
沟通管理知识体系: 规划沟通管理: 10.1 沟通维度划分 10.2 核心概念 定义:通过沟通活动(如会议和演讲),或以工件的方式(如电子邮件、社交媒体、项目报告或项目文档)等各种可能的方式来发送或接受消息 在项目沟通中,需要…...
13个UI设计软件,一次满足你的UI设计需求
UI设计师的角色是当今互联网时代非常重要的一部分。许多计算机和移动软件都需要UI设计师的参与,这个过程复杂而乏味。这里将与您分享13个UI设计软件,希望帮助您正确选择UI设计软件,节省工作量,创建更多优秀的UI设计作品。 1.即时…...
sentinel介绍
介绍 官网地址 Sentinel 和 Hystrix 的原则是一致的: 当调用链路中某个资源出现不稳定,例如,表现为 timeout,异常比例升高的时候,则对这个资源的调用进行限制,并让请求快速失败,避免影响到其它的资源&…...
手把手教你怎么搭建自己的ChatGPT和Midjourney绘图(含源码)
AI程序采用NUXT3LARAVEL9开发(目前版本V1.1.7) 授权方式:三个顶级域名两次更换 1.AI智能对话-对接官方和官方反代(markdown输出)PS:采用百度与自用库检测文字 2.AI绘图-根据关键词绘图-增加dreamStudio绘画-增加mid…...
继承多态经典笔试题
注:visual studio复制当前行粘贴到下一行: CTRLD 杂项 调用子类重写的虚函数(带默认参数),但参数用的是基类的虚函数中的默认参数: 这是由于参数是在编译时压入 试题一 交换两个基类指针指向的对象的vf…...
如何使用Typeface-Helper-自定义字体
随着科技的不断发展,人们对于视觉效果的要求也越来越高。在设计领域中,字体设计是非常重要的一环,因为它直接影响了整个设计的风格和品质。因此,越来越多的设计师开始寻找能够帮助他们自定义字体的工具。在这个过程中,…...
SubMain CodeIt.Right 2022.2 Crack
CodeIt.Right,从源头上提高产品质量,在编写代码时获取有关问题的实时反馈,支持最佳实践和合规性,自动执行代码审查,轻松避免与您的群组无关的通知,一目了然地了解代码库的运行状况 自动执行代码审查 使用自…...
文艺复兴的核心是“以人为本”:圣母百花大教堂(Duomo)
文章目录 引言I 圣母百花大教堂的建筑技术故事1.1 布鲁内莱斯基1.2 表现三维立体的透视画法II 美第奇家族的贡献2.1 科西莫德美第奇2.2 洛伦佐美第奇III 历史中的偶然性与必然性。3.1 文艺复兴的诞生其实是必然的事情3.2 文艺复兴的偶然性引言 从科技的视角再次理解文艺复兴,…...
校招失败后,在小公司熬了 2 年终于进了百度,竭尽全力....
其实两年前校招的时候就往百度投了一次简历,结果很明显凉了,随后这个理想就被暂时放下了,但是这个种子一直埋在心里这两年除了工作以外,也会坚持写博客,也因此结识了很多优秀的小伙伴,从他们身上学到了特别…...
【C++学习】函数模板
模板的概念 模板就是建立通用的模具,大大提高复用性。 模板的特点: 模板不可以直接使用,它只是一个模型 模板的通用不是万能的 基本语法 C中提供两种模板机制:函数模板和类模板 函数模板作用: 建立一个通用函数&…...
1960-2014年各国二氧化碳排放量(人均公吨数)
1960-2014年各国二氧化碳排放量(人均公吨数)(世界发展指标, 2019年12月更新) 1、来源:世界发展指标 2、时间:1960-2014年 3、范围:世界各国 4、指标: 二氧…...
UE5 学习系列(二)用户操作界面及介绍
这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…...
【配置 YOLOX 用于按目录分类的图片数据集】
现在的图标点选越来越多,如何一步解决,采用 YOLOX 目标检测模式则可以轻松解决 要在 YOLOX 中使用按目录分类的图片数据集(每个目录代表一个类别,目录下是该类别的所有图片),你需要进行以下配置步骤&#x…...
【MATLAB代码】基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),附源代码|订阅专栏后可直接查看
文章所述的代码实现了基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),针对传感器观测数据中存在的脉冲型异常噪声问题,通过非线性加权机制提升滤波器的抗干扰能力。代码通过对比传统KF与MCC-KF在含异常值场景下的表现,验证了后者在状态估计鲁棒性方面的显著优…...
BLEU评分:机器翻译质量评估的黄金标准
BLEU评分:机器翻译质量评估的黄金标准 1. 引言 在自然语言处理(NLP)领域,衡量一个机器翻译模型的性能至关重要。BLEU (Bilingual Evaluation Understudy) 作为一种自动化评估指标,自2002年由IBM的Kishore Papineni等人提出以来,…...
windows系统MySQL安装文档
概览:本文讨论了MySQL的安装、使用过程中涉及的解压、配置、初始化、注册服务、启动、修改密码、登录、退出以及卸载等相关内容,为学习者提供全面的操作指导。关键要点包括: 解压 :下载完成后解压压缩包,得到MySQL 8.…...
leetcode_69.x的平方根
题目如下 : 看到题 ,我们最原始的想法就是暴力解决: for(long long i 0;i<INT_MAX;i){if(i*ix){return i;}else if((i*i>x)&&((i-1)*(i-1)<x)){return i-1;}}我们直接开始遍历,我们是整数的平方根,所以我们分两…...
学习 Hooks【Plan - June - Week 2】
一、React API React 提供了丰富的核心 API,用于创建组件、管理状态、处理副作用、优化性能等。本文档总结 React 常用的 API 方法和组件。 1. React 核心 API React.createElement(type, props, …children) 用于创建 React 元素,JSX 会被编译成该函数…...
【笔记】结合 Conda任意创建和配置不同 Python 版本的双轨隔离的 Poetry 虚拟环境
如何结合 Conda 任意创建和配置不同 Python 版本的双轨隔离的Poetry 虚拟环境? 在 Python 开发中,为不同项目配置独立且适配的虚拟环境至关重要。结合 Conda 和 Poetry 工具,能高效创建不同 Python 版本的 Poetry 虚拟环境,接下来…...
汇编语言学习(三)——DoxBox中debug的使用
目录 一、安装DoxBox,并下载汇编工具(MASM文件) 二、debug是什么 三、debug中的命令 一、安装DoxBox,并下载汇编工具(MASM文件) 链接: https://pan.baidu.com/s/1IbyJj-JIkl_oMOJmkKiaGQ?pw…...
前端异步编程全场景解读
前端异步编程是现代Web开发的核心,它解决了浏览器单线程执行带来的UI阻塞问题。以下从多个维度进行深度解析: 一、异步编程的核心概念 JavaScript的执行环境是单线程的,这意味着在同一时间只能执行一个任务。为了不阻塞主线程,J…...
