Redis - 消息队列 Stream
一、概述
消息队列
定义- 消息队列模型:一种分布式系统中的消息传递方案,由消息队列、生产者和消费者组成
- 消息队列:负责存储和管理消息的中间件,也称为消息代理(Message Broker)
- 生产者:负责 产生并发送 消息到队列的应用程序
- 消费者:负责从队列 获取并处理 消息的应用程序
功能:实现消息发送和处理的解耦,支持异步通信,提高系统的可扩展性和可靠性- 主流消息队列解决方案
- RabbitMQ:轻量级,支持多种协议,适合中小规模应用
- RocketMQ:阿里开源,高性能,适合大规模分布式应用
Stream
定义:Stream:Redis 5.0 引入的一种数据类型,用于处理高吞吐量的消息流、事件流等场景功能:按时间顺序 ”添加、读取、消费“ 消息,支持消费者组、消息确认等功能
二、Stream 工作流程
- 写入消息:
- 生产者通过
XADD向 Stream 中添加消息。每条消息自动获得唯一的 ID,按时间顺序存入 Stream。
- 生产者通过
- 创建消费者组
- 如果使用消费者组,首先需要通过
XGROUP CREATE创建消费者组。 - 消费者组会根据时间顺序将消息分配给组内的消费者。
- 如果使用消费者组,首先需要通过
- 读取消息:
- 消费者使用
XREADGROUP命令读取 Stream 中的消息。 - 消息按规则分配给不同消费者处理,每个消费者读取到不同的消息。
- 消费者使用
- 确认消息:
- 消费者在处理完消息后,使用
XACK命令确认消息,表示该消息已成功处理。 - 如果消息未确认(例如消费者崩溃或超时),它将保持在 Pending 状态,等待重新分配给其他消费者。
- 消费者在处理完消息后,使用
- 重新分配未确认消息:
- 如果消息在一定时间内没有被确认,其他消费者可以读取未确认的消息并进行处理。
- 可通过
XPENDING命令查看未确认消息,或在消费者组中设置时间阈值自动重新分配。
- 删除消费者组:
- 不再需要消费者组时,使用
XGROUP DESTROY命令删除消费者组
- 不再需要消费者组时,使用
三、Stream 实现
消费者组模式
定义:Redis Streams 的一部分,用于处理消息的分布式消费优点- 消息分流:多消费者争抢消息,加快消费速度,避免消息堆积
- 消息标示:避免消息漏读,消费者读取消息后不马上销毁,加入 consumerGroup 维护的 pending list 队列等待 ACK
- 消息确认:通过消息 ACK 机制,保证消息至少被消费一次
- 可以阻塞读取,避免盲等
实现方法:通过 Stream 数据类型实现消息队列,命令以 “X” 开头
常用命令
XGROUP CREATE key groupName ID [MKSTREAM]
- 功能:创建消费者组
- 参数
- key:队列名称
- groupName:组名称
- ID:起始 ID 标识,$ 表示队列中最后一个消息,0 表示队列中第一个消息
- MKSTREAM:队列不存在则创建队列
XGROUP DESTORY key groupName
- 功能:删除指定消费者组
XGROUP CREATECONSUMER key groupName consumerName
- 功能:添加组中消费者
XGROUP DELCONSUMER key groupName consumerName
- 功能:删除组中消费者
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]
- 功能:读取组中的消息
- gourp:消费者组名称
- consumer:消费者名称(不存在则自动创建)
- count:本次查询的最大数量
- BLOCK milliseconds:当没有消息时最长等待时间
- NOACK:无需手动 ACK,获取到消息后自动确认
- STREAMS KEY:指定队列名称
- ID:获取消息的起始 ID,
>表示从下一个未消费消息开始 (常用)
XPENDING key group [ [ IDLE min-idle-time ] start end count [consumer] ]
- 功能:获取 pending-list 中的消息
- IDLE:获取消息后、确认消息前的这段时间,空闲时间超过 min-idle-time 则取出
- start:获取的最小目标 ID
- end:获取的最大目标 ID
- count:获取的数量
- consumer:获取 consumer 的 pending-list
XACK key group ID [ ID … ]
- 功能:确认从组中读取的消息已被处理
- key:队列名称
- group:组名称
- ID:消息的 ID
表格版命令
-
命令
命令 功能 XGROUP CREATE key groupName ID [MKSTREAM] 创建消费者组 XGROUP DESTORY key groupName 删除指定消费者组 XGROUP CREATECONSUMER key groupName consumerName 添加组中消费者 XGROUP DELCONSUMER key groupName consumerName 删除组中消费者 XREADGROUP GROUP groupName consumerName [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …] 读取组中的消息,ID 填写 “ >” 则读取第一条未读消息XACK key group ID [ ID … ] 确认从组中读取的消息已被处理 -
属性
属性名 定义 key 队列名称 groupName 消费者组名称 ID 起始 ID 标示,$ 代表队列中最后一个消息,0 代表第一个消息 MKSTREAM 队列不存在时自动创建队列 BLOCK milliseconds 没有消息时的最大等待时长 NOACK 无需手动 ACK,获取到消息后自动确认 STREAMS key 指定队列名称
运行逻辑
while(true) {// 尝试监听队列,使用阻塞模式,最长等待 2000 msObject msg = redis.call("XREADGROUP GROUP group1 consumer1 COUNT 1 BLOCK 2000 STREAMS s1 >");if(msg == null) {continue;}try {// 处理消息,完成后一定要 ACKhandleMessage(msg);} catch (Exception e) {while(true) {// 重新读取阻塞队列消息Object msg = redis.call("XREADGROUP GROUP group1 consumer1 COUNT 1 STREAM S1 0");if(msg == null) // 如果阻塞队中的消息已经全部处理则退出pending-listbreak;try {handleMessage(msg); // 重新处理 pending-list 中的消息} catch (Exception e){continue; // 如果还出错, 则继续重新读取}}}
}
四、示例
-
目标:消息队列实现数据库异步修改数据库,将下单 message 缓存在 redis 中,减小下单操作对数据库的冲击
-
项目结构
- RedisConfig 配置类:创建消费者组是一次性的操作,适合放在配置类中
- VoucherOrderHandler 内部类:消费者的逻辑和订单业务相关,因此适合放在 VoucherOrderServiceImpl 中
- 多线程启动逻辑:消费者线程的启动与订单业务密切相关,直接放在 VoucherOrderServiceImpl 类中更符合职责分离原则
src/main/java ├── com/example │ ├── config │ │ └── RedisConfig.java // Redis 配置类,包含消费者组初始化 │ ├── service │ │ ├── VoucherOrderService.java │ │ └── impl │ │ └── VoucherOrderServiceImpl.java // 包含 VoucherOrderHandler 内部类 │ ├── entity │ │ └── VoucherOrder.java // 优惠券订单实体 │ ├── utils │ │ └── BeanUtil.java // 用于 Map 转 Bean 的工具类 │ └── controller │ └── VoucherOrderController.java // 如果有 Controller -
创建消费者组(config.RedisConfig)
@Bean public void initStreamGroup() {// 检查是否存在消费者组 g1try {stringRedisTemplate.opsForStream().createGroup("stream.orders", "g1");} catch (RedisSystemException e) {// 如果 group 已存在,抛出异常,可忽略log.warn("消费者组 g1 已存在");} } -
创建消费者线程
- 位置:作为 VoucherOrderServiceImpl 内的预构造部分
@PostConstruct public void startConsumers() {for (int i = 0; i < 5; i++) { // 5 个线程,模拟多个消费者new Thread(new VoucherOrderHandler()).start();} } -
添加消息到消息队列 (src/main/resources/lua/SECKILL_SCRIPT.lua)
--1. 参数列表 --1.1. 优惠券id local voucherId = ARGV[1] --1.2. 用户id local userId = ARGV[2] --1.3. 订单id local orderId = ARGV[3]--2. 数据key local stockKey = 'seckill:stock:' .. voucherId --2.1. 库存key local orderKey = 'seckill:order' .. voucherId --2.2. 订单key--3. 脚本业务 --3.1. 判断库存是否充足 get stockKey if( tonumber( redis.call('GET', stockKey) ) <= 0 ) thenreturn 1 end --3.2. 判断用户是否重复下单 SISMEMBER orderKey userId if( redis.call( 'SISMEMBER', orderKey, userId ) == 1 ) thenreturn 2 end --3.4 扣库存 incrby stockKey -1 redis.call( 'INCRBY', stockKey, -1 ) --3.5 下单(保存用户) sadd orderKey userId redis.call( 'SADD', orderKey, userId ) -- 3.6. 发送消息到队列中 redis.call( 'XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId ) -
创建消费者类(ServiceImpl)
- 位置:作为 VoucherOrderServiceImpl 内的私有类
// 在ServiceImpl中创建一个VoucherOrderHandler消费者类,专门用于处理消息队列中的消息 private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 1. 获取消息队列中的订单信息List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create( "stream.order", ReadOffset.lastConsumed()));// 2. 没有消息则重新监听if (list == null || list.isEmpty() ) continue;// 3. 获取消息中的 voucherOrderMapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 创建订单createVoucherOrder(voucherOrder);// 5. 确认当前消息已消费 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch ( Exception e) {log.error("处理订单异常", e);// 6. 处理订单失败则消息会加入pending-list,继续处理pending-listhandlePendingList();}}}// 处理pending-list中的消息private void handlePendingList() {while(true) {try {// 1. 消费pending-list中的消息List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"), // 消费者此消息的消费者StreamReadOptions.empty().count(1), // StreamOffset.create("stream.order", ReadOffset.from("0")) // 从pending-list的第一条消息开始读);// 2. 退出条件, list 为空 -> pending-list 已全部处理if(list == null || list.isEmpty()) break;// 3. 获取消息中的 voucherOrderMapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 创建订单createVoucherOrder(voucherOrder);// 5. 确认消息已消费(XACK)stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("处理pendding订单异常", e);try{Thread.sleep(20); // 如果发生异常则休眠一会再重新消费pending-list中的消息} catch (Exception e2) {e.printStackTrace(); }}}} } -
创建消息方法
- 目标:用户通过这个方法发送一条创建订单的 Message 给 Redis Stream
// 创建Lua脚本对象 private static final DefaultRedisScript<Long> SECKILL_SCRIPT;// Lua脚本初始化 (通过静态代码块) static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/SECKILL_SCRIPT.lua"));SECKILL_SCRIPT.setResultType(Long.class); }@Override public void createVoucherOrder(Long voucherId, Long userId) {// 生成订单 ID(模拟)long orderId = System.currentTimeMillis();// 执行 Lua 脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(), // 使用空的 key 列表voucherId.toString(), userId.toString(), String.valueOf(orderId));// 根据 Lua 脚本返回结果处理if (result == 1) {throw new RuntimeException("库存不足!");} else if (result == 2) {throw new RuntimeException("不能重复下单!");}// 如果脚本执行成功,则订单消息会进入 Redis Stream,消费者组会自动处理System.out.println("订单创建成功!"); }
(缺陷) 单消费者模式
常用命令- XADD key [NOMKSTREAM] [MAXLEN | MINID [=|~] threshold [LIMIT count] * | ID field value [field value …]
- XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ ID … ]
缺陷:有消息漏读风险
五、其他消息队列方案
(缺陷) List 实现
优点- 不受 JVM 内存上限限制:因为利用 Redis 存储
- 数据安全 :因为基于 List 结构本身是数据存储,基于 Redis 持久化机制
- 消息有序性:通过 List 结构的 LPUSH & BRPOP 命令实现顺序
缺点- 消息丢失:BRPOP 的时候如果宕机则消息会丢失
- 只支持单消费者
(缺陷) PubSub 实现
- 定义
- Publish & Subscribe 模型,一种消息队列模型
- 生产者向指定的 channel 来 public 消息
- 消费者从 subscribe 的 channel 中接收消息
- 功能:支持多消费者模式,多个消费者可以同时 subscribe 一个 channel
- 优点:采用发布订阅模型,支持多生产者、消费者
- 缺点
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
三种消息队列对比

相关文章:
Redis - 消息队列 Stream
一、概述 消息队列 定义 消息队列模型:一种分布式系统中的消息传递方案,由消息队列、生产者和消费者组成消息队列:负责存储和管理消息的中间件,也称为消息代理(Message Broker)生产者:负责 产…...
Docker:国内加速源
阿里云docker加速云: sudo tee /etc/docker/daemon.json <<EOF { “registry-mirrors”: [“https://euf11uji.mirror.aliyuncs.com”] } EOFhttps://docker.mozhu.dev/ sudo tee /etc/docker/daemon.json <<EOF {"registry-mirrors": [&qu…...
Android Studio更改项目使用的JDK
一、吐槽 过去,在安卓项目中配置JDK和Gradle的过程非常直观,只需要进入Android Studio的File菜单中的Project Structure即可进行设置,十分方便。 原本可以在这修改JDK: 但大家都知道,Android Studio的狗屎性能,再加…...
ubuntu+ros新手笔记(四):gazebo无法加载
以下为ChatGPT 的解决方案,对我来说是可行的!! 我按照第2步操作就解决辣!! 我的提问: 在ubuntu 22.04 和ros2 humble环境下,gazebo加载不了 ChatGPT 回答: 在 Ubuntu 22.04 和 …...
vue季度选择器(antd2.0 版本无此控件,单独写一个)
vue季度选择器 效果显示 效果显示 <template><div><a-popoverplacement"bottom"overlayClassName"season-picker"trigger"click"v-model"showSeason"><template #content><div class"season-picker-b…...
C/C++代码性能优化技巧的书籍及资料
使用C/C开发的场景,大多对代码的执行的速度,实时性有较高的要求,像嵌入式系统的开发,资源还受限。在算力存储空间有限的MCU上写出简洁又高效的代码实际是一种艺术。软件工程师在代码设计上的这种差距,会反映在产品的性…...
通俗易懂的 Nginx 反向代理 配置
通俗易懂的 Nginx 反向代理 配置 首先 root 与 alias 的区别 root 是直接拼接 root location location /i/ {root /data/w3; }当请求 /i/top.gif ,/data/w3/i/top.gif 会被返回。 alias 是用 alias 替换 location location /i/ {alias /data/w3/images/; }当请…...
docker设置容器自动启动
说起开机自动启动应该很多人都遇到过,我们公司做的系统很多的中间件都没有设置开机自动启动然后中间修改问题又设置了一些临时生效的文件,开始的时候大家都不以为意,知道公司陆续有人离职入职管理交接一塌糊涂,项目成了历史遗留问…...
蓝桥杯刷题——day1
蓝桥杯刷题——day1 题目一题干题目解析代码 题目二题干题目解析代码 题目一 题干 给定一个字符串 s ,验证 s 是否是 回文串 ,只考虑字母和数字字符,可以忽略字母的大小写。本题中,将空字符串定义为有效的 回文串 。 题目链接&a…...
Leetcode 面试150题 399.除法求值
系列博客目录 文章目录 系列博客目录题目思路代码 题目 链接 思路 广度优先搜索 我们可以将整个问题建模成一张图:给定图中的一些点(点即变量),以及某些边的权值(权值即两个变量的比值),试…...
活动预告 |【Part2】Microsoft 安全在线技术公开课:安全性、合规性和身份基础知识
课程介绍 通过参加“Microsoft 安全在线技术公开课:安全性、合规性和身份基础知识”活动提升你的技能。在本次免费的介绍性活动中,你将获得所需的安全技能和培训,以创造影响力并利用机会推动职业发展。你将了解安全性、合规性和身份的基础知…...
Unity游戏实战
很小的时候在键盘机上玩过一个游戏叫寻秦,最近看有大佬把他的安卓版做出来了,打开封面就是Unity,想自己也尝试一下。...
SQL中的替换函数replace() 使用
这条 SQL 语句的作用是将 tool_tool 表中所有 link 字段包含 https://www.xxspvip.cn 的记录中的 https://www.xxspvip.cn 替换为 http://192.168.1.1。具体解释如下: SQL 语句分解 UPDATE tool_toolSET link REPLACE(link, https://www.xxspvip.cn, http://192.…...
Python面试常见问题及答案5
一、基础语法相关 问题1: Python的可变数据类型和不可变数据类型有哪些? 答案: 在Python中,可变数据类型有列表(list)、字典(dict)、集合(set)。这些数据类型…...
(css)element中el-select下拉框整体样式修改
(css)element中el-select下拉框整体样式修改 重点代码(颜色可行修改) // 修改input默认值颜色 兼容其它主流浏览器 /deep/ input::-webkit-input-placeholder {color: rgba(255, 255, 255, 0.50); } /deep/ input::-moz-input-placeholder {color: rgba…...
点击按钮打开dialog嵌套表格checked数据关闭dialog回显checked数据
介绍:点击按钮打开dialog嵌套表格,勾选数据,点击确认关闭弹窗并且回显选中得数据,回显的数据被删除,dialog里面的数据也被取消勾选,废话不多说 上代码!!! 这里的勾选回显…...
《拉依达的嵌入式\驱动面试宝典》—C/CPP基础篇(三)
《拉依达的嵌入式\驱动面试宝典》—C/CPP基础篇(三) 你好,我是拉依达。 感谢所有阅读关注我的同学支持,目前博客累计阅读 27w,关注1.5w人。其中博客《最全Linux驱动开发全流程详细解析(持续更新)-CSDN博客》已经是 Linux驱动 相关内容搜索的推荐首位,感谢大家支持。 《拉…...
大模型呼出机器人有哪些优势和劣势?
大模型呼出机器人有哪些优势和劣势? 原作者:开源呼叫中心FreeIPCC,其Github:https://github.com/lihaiya/freeipcc 大模型呼出机器人在实际应用中展现出了一系列优势和劣势,以下是对其优势和劣势的详细分析ÿ…...
Python鼠标轨迹算法(游戏防检测)
一.简介 鼠标轨迹算法是一种模拟人类鼠标操作的程序,它能够模拟出自然而真实的鼠标移动路径。 鼠标轨迹算法的底层实现采用C/C语言,原因在于C/C提供了高性能的执行能力和直接访问操作系统底层资源的能力。 鼠标轨迹算法具有以下优势: 模拟…...
安宝特分享 | AR技术助力医院总院与分院间的远程面诊
随着科技的迅猛发展,增强现实(AR)技术在各行各业的应用愈发广泛,特别是在医疗领域,其潜力和价值正在被不断挖掘。在现代医疗环境中,患者常常面临“看病难、看病远、看病急”等诸多挑战,而安宝特…...
ESP32读取DHT11温湿度数据
芯片:ESP32 环境:Arduino 一、安装DHT11传感器库 红框的库,别安装错了 二、代码 注意,DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...
SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现
摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序,以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务,提供稳定高效的数据处理与业务逻辑支持;利用 uniapp 实现跨平台前…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)
宇树机器人多姿态起立控制强化学习框架论文解析 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一) 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...
大学生职业发展与就业创业指导教学评价
这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...
Spring数据访问模块设计
前面我们已经完成了IoC和web模块的设计,聪明的码友立马就知道了,该到数据访问模块了,要不就这俩玩个6啊,查库势在必行,至此,它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据(数据库、No…...
2025季度云服务器排行榜
在全球云服务器市场,各厂商的排名和地位并非一成不变,而是由其独特的优势、战略布局和市场适应性共同决定的。以下是根据2025年市场趋势,对主要云服务器厂商在排行榜中占据重要位置的原因和优势进行深度分析: 一、全球“三巨头”…...
面向无人机海岸带生态系统监测的语义分割基准数据集
描述:海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而,目前该领域仍面临一个挑战,即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...
GO协程(Goroutine)问题总结
在使用Go语言来编写代码时,遇到的一些问题总结一下 [参考文档]:https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/goroutine.html 1. main()函数默认的Goroutine 场景再现: 今天在看到这个教程的时候,在自己的电…...
省略号和可变参数模板
本文主要介绍如何展开可变参数的参数包 1.C语言的va_list展开可变参数 #include <iostream> #include <cstdarg>void printNumbers(int count, ...) {// 声明va_list类型的变量va_list args;// 使用va_start将可变参数写入变量argsva_start(args, count);for (in…...
【Linux手册】探秘系统世界:从用户交互到硬件底层的全链路工作之旅
目录 前言 操作系统与驱动程序 是什么,为什么 怎么做 system call 用户操作接口 总结 前言 日常生活中,我们在使用电子设备时,我们所输入执行的每一条指令最终大多都会作用到硬件上,比如下载一款软件最终会下载到硬盘上&am…...
