Redis - 消息队列 Stream
一、概述
消息队列
定义
- 消息队列模型:一种分布式系统中的消息传递方案,由消息队列、生产者和消费者组成
- 消息队列:负责存储和管理消息的中间件,也称为消息代理(Message Broker)
- 生产者:负责 产生并发送 消息到队列的应用程序
- 消费者:负责从队列 获取并处理 消息的应用程序
功能
:实现消息发送和处理的解耦,支持异步通信,提高系统的可扩展性和可靠性- 主流消息队列解决方案
- RabbitMQ:轻量级,支持多种协议,适合中小规模应用
- RocketMQ:阿里开源,高性能,适合大规模分布式应用
Stream
定义
:Stream:Redis 5.0 引入的一种数据类型,用于处理高吞吐量的消息流、事件流等场景功能
:按时间顺序 ”添加、读取、消费“ 消息,支持消费者组、消息确认等功能
二、Stream 工作流程
- 写入消息:
- 生产者通过
XADD
向 Stream 中添加消息。每条消息自动获得唯一的 ID,按时间顺序存入 Stream。
- 生产者通过
- 创建消费者组
- 如果使用消费者组,首先需要通过
XGROUP CREATE
创建消费者组。 - 消费者组会根据时间顺序将消息分配给组内的消费者。
- 如果使用消费者组,首先需要通过
- 读取消息:
- 消费者使用
XREADGROUP
命令读取 Stream 中的消息。 - 消息按规则分配给不同消费者处理,每个消费者读取到不同的消息。
- 消费者使用
- 确认消息:
- 消费者在处理完消息后,使用
XACK
命令确认消息,表示该消息已成功处理。 - 如果消息未确认(例如消费者崩溃或超时),它将保持在 Pending 状态,等待重新分配给其他消费者。
- 消费者在处理完消息后,使用
- 重新分配未确认消息:
- 如果消息在一定时间内没有被确认,其他消费者可以读取未确认的消息并进行处理。
- 可通过
XPENDING
命令查看未确认消息,或在消费者组中设置时间阈值自动重新分配。
- 删除消费者组:
- 不再需要消费者组时,使用
XGROUP DESTROY
命令删除消费者组
- 不再需要消费者组时,使用
三、Stream 实现
消费者组模式
定义
:Redis Streams 的一部分,用于处理消息的分布式消费优点
- 消息分流:多消费者争抢消息,加快消费速度,避免消息堆积
- 消息标示:避免消息漏读,消费者读取消息后不马上销毁,加入 consumerGroup 维护的 pending list 队列等待 ACK
- 消息确认:通过消息 ACK 机制,保证消息至少被消费一次
- 可以阻塞读取,避免盲等
实现方法
:通过 Stream 数据类型实现消息队列,命令以 “X” 开头
常用命令
XGROUP CREATE key groupName ID [MKSTREAM]
- 功能:创建消费者组
- 参数
- key:队列名称
- groupName:组名称
- ID:起始 ID 标识,$ 表示队列中最后一个消息,0 表示队列中第一个消息
- MKSTREAM:队列不存在则创建队列
XGROUP DESTORY key groupName
- 功能:删除指定消费者组
XGROUP CREATECONSUMER key groupName consumerName
- 功能:添加组中消费者
XGROUP DELCONSUMER key groupName consumerName
- 功能:删除组中消费者
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]
- 功能:读取组中的消息
- gourp:消费者组名称
- consumer:消费者名称(不存在则自动创建)
- count:本次查询的最大数量
- BLOCK milliseconds:当没有消息时最长等待时间
- NOACK:无需手动 ACK,获取到消息后自动确认
- STREAMS KEY:指定队列名称
- ID:获取消息的起始 ID,
>
表示从下一个未消费消息开始 (常用)
XPENDING key group [ [ IDLE min-idle-time ] start end count [consumer] ]
- 功能:获取 pending-list 中的消息
- IDLE:获取消息后、确认消息前的这段时间,空闲时间超过 min-idle-time 则取出
- start:获取的最小目标 ID
- end:获取的最大目标 ID
- count:获取的数量
- consumer:获取 consumer 的 pending-list
XACK key group ID [ ID … ]
- 功能:确认从组中读取的消息已被处理
- key:队列名称
- group:组名称
- ID:消息的 ID
表格版命令
-
命令
命令 功能 XGROUP CREATE key groupName ID [MKSTREAM] 创建消费者组 XGROUP DESTORY key groupName 删除指定消费者组 XGROUP CREATECONSUMER key groupName consumerName 添加组中消费者 XGROUP DELCONSUMER key groupName consumerName 删除组中消费者 XREADGROUP GROUP groupName consumerName [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …] 读取组中的消息,ID 填写 “ >
” 则读取第一条未读消息XACK key group ID [ ID … ] 确认从组中读取的消息已被处理 -
属性
属性名 定义 key 队列名称 groupName 消费者组名称 ID 起始 ID 标示,$ 代表队列中最后一个消息,0 代表第一个消息 MKSTREAM 队列不存在时自动创建队列 BLOCK milliseconds 没有消息时的最大等待时长 NOACK 无需手动 ACK,获取到消息后自动确认 STREAMS key 指定队列名称
运行逻辑
while(true) {// 尝试监听队列,使用阻塞模式,最长等待 2000 msObject msg = redis.call("XREADGROUP GROUP group1 consumer1 COUNT 1 BLOCK 2000 STREAMS s1 >");if(msg == null) {continue;}try {// 处理消息,完成后一定要 ACKhandleMessage(msg);} catch (Exception e) {while(true) {// 重新读取阻塞队列消息Object msg = redis.call("XREADGROUP GROUP group1 consumer1 COUNT 1 STREAM S1 0");if(msg == null) // 如果阻塞队中的消息已经全部处理则退出pending-listbreak;try {handleMessage(msg); // 重新处理 pending-list 中的消息} catch (Exception e){continue; // 如果还出错, 则继续重新读取}}}
}
四、示例
-
目标:消息队列实现数据库异步修改数据库,将下单 message 缓存在 redis 中,减小下单操作对数据库的冲击
-
项目结构
- RedisConfig 配置类:创建消费者组是一次性的操作,适合放在配置类中
- VoucherOrderHandler 内部类:消费者的逻辑和订单业务相关,因此适合放在 VoucherOrderServiceImpl 中
- 多线程启动逻辑:消费者线程的启动与订单业务密切相关,直接放在 VoucherOrderServiceImpl 类中更符合职责分离原则
src/main/java ├── com/example │ ├── config │ │ └── RedisConfig.java // Redis 配置类,包含消费者组初始化 │ ├── service │ │ ├── VoucherOrderService.java │ │ └── impl │ │ └── VoucherOrderServiceImpl.java // 包含 VoucherOrderHandler 内部类 │ ├── entity │ │ └── VoucherOrder.java // 优惠券订单实体 │ ├── utils │ │ └── BeanUtil.java // 用于 Map 转 Bean 的工具类 │ └── controller │ └── VoucherOrderController.java // 如果有 Controller
-
创建消费者组(config.RedisConfig)
@Bean public void initStreamGroup() {// 检查是否存在消费者组 g1try {stringRedisTemplate.opsForStream().createGroup("stream.orders", "g1");} catch (RedisSystemException e) {// 如果 group 已存在,抛出异常,可忽略log.warn("消费者组 g1 已存在");} }
-
创建消费者线程
- 位置:作为 VoucherOrderServiceImpl 内的预构造部分
@PostConstruct public void startConsumers() {for (int i = 0; i < 5; i++) { // 5 个线程,模拟多个消费者new Thread(new VoucherOrderHandler()).start();} }
-
添加消息到消息队列 (src/main/resources/lua/SECKILL_SCRIPT.lua)
--1. 参数列表 --1.1. 优惠券id local voucherId = ARGV[1] --1.2. 用户id local userId = ARGV[2] --1.3. 订单id local orderId = ARGV[3]--2. 数据key local stockKey = 'seckill:stock:' .. voucherId --2.1. 库存key local orderKey = 'seckill:order' .. voucherId --2.2. 订单key--3. 脚本业务 --3.1. 判断库存是否充足 get stockKey if( tonumber( redis.call('GET', stockKey) ) <= 0 ) thenreturn 1 end --3.2. 判断用户是否重复下单 SISMEMBER orderKey userId if( redis.call( 'SISMEMBER', orderKey, userId ) == 1 ) thenreturn 2 end --3.4 扣库存 incrby stockKey -1 redis.call( 'INCRBY', stockKey, -1 ) --3.5 下单(保存用户) sadd orderKey userId redis.call( 'SADD', orderKey, userId ) -- 3.6. 发送消息到队列中 redis.call( 'XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId )
-
创建消费者类(ServiceImpl)
- 位置:作为 VoucherOrderServiceImpl 内的私有类
// 在ServiceImpl中创建一个VoucherOrderHandler消费者类,专门用于处理消息队列中的消息 private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 1. 获取消息队列中的订单信息List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create( "stream.order", ReadOffset.lastConsumed()));// 2. 没有消息则重新监听if (list == null || list.isEmpty() ) continue;// 3. 获取消息中的 voucherOrderMapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 创建订单createVoucherOrder(voucherOrder);// 5. 确认当前消息已消费 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch ( Exception e) {log.error("处理订单异常", e);// 6. 处理订单失败则消息会加入pending-list,继续处理pending-listhandlePendingList();}}}// 处理pending-list中的消息private void handlePendingList() {while(true) {try {// 1. 消费pending-list中的消息List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"), // 消费者此消息的消费者StreamReadOptions.empty().count(1), // StreamOffset.create("stream.order", ReadOffset.from("0")) // 从pending-list的第一条消息开始读);// 2. 退出条件, list 为空 -> pending-list 已全部处理if(list == null || list.isEmpty()) break;// 3. 获取消息中的 voucherOrderMapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 创建订单createVoucherOrder(voucherOrder);// 5. 确认消息已消费(XACK)stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("处理pendding订单异常", e);try{Thread.sleep(20); // 如果发生异常则休眠一会再重新消费pending-list中的消息} catch (Exception e2) {e.printStackTrace(); }}}} }
-
创建消息方法
- 目标:用户通过这个方法发送一条创建订单的 Message 给 Redis Stream
// 创建Lua脚本对象 private static final DefaultRedisScript<Long> SECKILL_SCRIPT;// Lua脚本初始化 (通过静态代码块) static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/SECKILL_SCRIPT.lua"));SECKILL_SCRIPT.setResultType(Long.class); }@Override public void createVoucherOrder(Long voucherId, Long userId) {// 生成订单 ID(模拟)long orderId = System.currentTimeMillis();// 执行 Lua 脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(), // 使用空的 key 列表voucherId.toString(), userId.toString(), String.valueOf(orderId));// 根据 Lua 脚本返回结果处理if (result == 1) {throw new RuntimeException("库存不足!");} else if (result == 2) {throw new RuntimeException("不能重复下单!");}// 如果脚本执行成功,则订单消息会进入 Redis Stream,消费者组会自动处理System.out.println("订单创建成功!"); }
(缺陷) 单消费者模式
常用命令
- XADD key [NOMKSTREAM] [MAXLEN | MINID [=|~] threshold [LIMIT count] * | ID field value [field value …]
- XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ ID … ]
缺陷
:有消息漏读风险
五、其他消息队列方案
(缺陷) List 实现
优点
- 不受 JVM 内存上限限制:因为利用 Redis 存储
- 数据安全 :因为基于 List 结构本身是数据存储,基于 Redis 持久化机制
- 消息有序性:通过 List 结构的 LPUSH & BRPOP 命令实现顺序
缺点
- 消息丢失:BRPOP 的时候如果宕机则消息会丢失
- 只支持单消费者
(缺陷) PubSub 实现
- 定义
- Publish & Subscribe 模型,一种消息队列模型
- 生产者向指定的 channel 来 public 消息
- 消费者从 subscribe 的 channel 中接收消息
- 功能:支持多消费者模式,多个消费者可以同时 subscribe 一个 channel
- 优点:采用发布订阅模型,支持多生产者、消费者
- 缺点
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
三种消息队列对比
相关文章:

Redis - 消息队列 Stream
一、概述 消息队列 定义 消息队列模型:一种分布式系统中的消息传递方案,由消息队列、生产者和消费者组成消息队列:负责存储和管理消息的中间件,也称为消息代理(Message Broker)生产者:负责 产…...
Docker:国内加速源
阿里云docker加速云: sudo tee /etc/docker/daemon.json <<EOF { “registry-mirrors”: [“https://euf11uji.mirror.aliyuncs.com”] } EOFhttps://docker.mozhu.dev/ sudo tee /etc/docker/daemon.json <<EOF {"registry-mirrors": [&qu…...

Android Studio更改项目使用的JDK
一、吐槽 过去,在安卓项目中配置JDK和Gradle的过程非常直观,只需要进入Android Studio的File菜单中的Project Structure即可进行设置,十分方便。 原本可以在这修改JDK: 但大家都知道,Android Studio的狗屎性能,再加…...

ubuntu+ros新手笔记(四):gazebo无法加载
以下为ChatGPT 的解决方案,对我来说是可行的!! 我按照第2步操作就解决辣!! 我的提问: 在ubuntu 22.04 和ros2 humble环境下,gazebo加载不了 ChatGPT 回答: 在 Ubuntu 22.04 和 …...

vue季度选择器(antd2.0 版本无此控件,单独写一个)
vue季度选择器 效果显示 效果显示 <template><div><a-popoverplacement"bottom"overlayClassName"season-picker"trigger"click"v-model"showSeason"><template #content><div class"season-picker-b…...

C/C++代码性能优化技巧的书籍及资料
使用C/C开发的场景,大多对代码的执行的速度,实时性有较高的要求,像嵌入式系统的开发,资源还受限。在算力存储空间有限的MCU上写出简洁又高效的代码实际是一种艺术。软件工程师在代码设计上的这种差距,会反映在产品的性…...

通俗易懂的 Nginx 反向代理 配置
通俗易懂的 Nginx 反向代理 配置 首先 root 与 alias 的区别 root 是直接拼接 root location location /i/ {root /data/w3; }当请求 /i/top.gif ,/data/w3/i/top.gif 会被返回。 alias 是用 alias 替换 location location /i/ {alias /data/w3/images/; }当请…...
docker设置容器自动启动
说起开机自动启动应该很多人都遇到过,我们公司做的系统很多的中间件都没有设置开机自动启动然后中间修改问题又设置了一些临时生效的文件,开始的时候大家都不以为意,知道公司陆续有人离职入职管理交接一塌糊涂,项目成了历史遗留问…...

蓝桥杯刷题——day1
蓝桥杯刷题——day1 题目一题干题目解析代码 题目二题干题目解析代码 题目一 题干 给定一个字符串 s ,验证 s 是否是 回文串 ,只考虑字母和数字字符,可以忽略字母的大小写。本题中,将空字符串定义为有效的 回文串 。 题目链接&a…...

Leetcode 面试150题 399.除法求值
系列博客目录 文章目录 系列博客目录题目思路代码 题目 链接 思路 广度优先搜索 我们可以将整个问题建模成一张图:给定图中的一些点(点即变量),以及某些边的权值(权值即两个变量的比值),试…...
活动预告 |【Part2】Microsoft 安全在线技术公开课:安全性、合规性和身份基础知识
课程介绍 通过参加“Microsoft 安全在线技术公开课:安全性、合规性和身份基础知识”活动提升你的技能。在本次免费的介绍性活动中,你将获得所需的安全技能和培训,以创造影响力并利用机会推动职业发展。你将了解安全性、合规性和身份的基础知…...

Unity游戏实战
很小的时候在键盘机上玩过一个游戏叫寻秦,最近看有大佬把他的安卓版做出来了,打开封面就是Unity,想自己也尝试一下。...

SQL中的替换函数replace() 使用
这条 SQL 语句的作用是将 tool_tool 表中所有 link 字段包含 https://www.xxspvip.cn 的记录中的 https://www.xxspvip.cn 替换为 http://192.168.1.1。具体解释如下: SQL 语句分解 UPDATE tool_toolSET link REPLACE(link, https://www.xxspvip.cn, http://192.…...
Python面试常见问题及答案5
一、基础语法相关 问题1: Python的可变数据类型和不可变数据类型有哪些? 答案: 在Python中,可变数据类型有列表(list)、字典(dict)、集合(set)。这些数据类型…...

(css)element中el-select下拉框整体样式修改
(css)element中el-select下拉框整体样式修改 重点代码(颜色可行修改) // 修改input默认值颜色 兼容其它主流浏览器 /deep/ input::-webkit-input-placeholder {color: rgba(255, 255, 255, 0.50); } /deep/ input::-moz-input-placeholder {color: rgba…...
点击按钮打开dialog嵌套表格checked数据关闭dialog回显checked数据
介绍:点击按钮打开dialog嵌套表格,勾选数据,点击确认关闭弹窗并且回显选中得数据,回显的数据被删除,dialog里面的数据也被取消勾选,废话不多说 上代码!!! 这里的勾选回显…...

《拉依达的嵌入式\驱动面试宝典》—C/CPP基础篇(三)
《拉依达的嵌入式\驱动面试宝典》—C/CPP基础篇(三) 你好,我是拉依达。 感谢所有阅读关注我的同学支持,目前博客累计阅读 27w,关注1.5w人。其中博客《最全Linux驱动开发全流程详细解析(持续更新)-CSDN博客》已经是 Linux驱动 相关内容搜索的推荐首位,感谢大家支持。 《拉…...
大模型呼出机器人有哪些优势和劣势?
大模型呼出机器人有哪些优势和劣势? 原作者:开源呼叫中心FreeIPCC,其Github:https://github.com/lihaiya/freeipcc 大模型呼出机器人在实际应用中展现出了一系列优势和劣势,以下是对其优势和劣势的详细分析ÿ…...

Python鼠标轨迹算法(游戏防检测)
一.简介 鼠标轨迹算法是一种模拟人类鼠标操作的程序,它能够模拟出自然而真实的鼠标移动路径。 鼠标轨迹算法的底层实现采用C/C语言,原因在于C/C提供了高性能的执行能力和直接访问操作系统底层资源的能力。 鼠标轨迹算法具有以下优势: 模拟…...

安宝特分享 | AR技术助力医院总院与分院间的远程面诊
随着科技的迅猛发展,增强现实(AR)技术在各行各业的应用愈发广泛,特别是在医疗领域,其潜力和价值正在被不断挖掘。在现代医疗环境中,患者常常面临“看病难、看病远、看病急”等诸多挑战,而安宝特…...

Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误
HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误,它们的含义、原因和解决方法都有显著区别。以下是详细对比: 1. HTTP 406 (Not Acceptable) 含义: 客户端请求的内容类型与服务器支持的内容类型不匹…...
FFmpeg 低延迟同屏方案
引言 在实时互动需求激增的当下,无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作,还是游戏直播的画面实时传输,低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架,凭借其灵活的编解码、数据…...
【磁盘】每天掌握一个Linux命令 - iostat
目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat(I/O Statistics)是Linux系统下用于监视系统输入输出设备和CPU使…...

DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI
前一阵子在百度 AI 开发者大会上,看到基于小智 AI DIY 玩具的演示,感觉有点意思,想着自己也来试试。 如果只是想烧录现成的固件,乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外,还提供了基于网页版的 ESP LA…...

CocosCreator 之 JavaScript/TypeScript和Java的相互交互
引擎版本: 3.8.1 语言: JavaScript/TypeScript、C、Java 环境:Window 参考:Java原生反射机制 您好,我是鹤九日! 回顾 在上篇文章中:CocosCreator Android项目接入UnityAds 广告SDK。 我们简单讲…...

HBuilderX安装(uni-app和小程序开发)
下载HBuilderX 访问官方网站:https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本: Windows版(推荐下载标准版) Windows系统安装步骤 运行安装程序: 双击下载的.exe安装文件 如果出现安全提示&…...

使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台
🎯 使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台 📌 项目背景 随着大语言模型(LLM)的广泛应用,开发者常面临多个挑战: 各大模型(OpenAI、Claude、Gemini、Ollama)接口风格不统一;缺乏一个统一平台进行模型调用与测试;本地模型 Ollama 的集成与前…...

如何在网页里填写 PDF 表格?
有时候,你可能希望用户能在你的网站上填写 PDF 表单。然而,这件事并不简单,因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件,但原生并不支持编辑或填写它们。更糟的是,如果你想收集表单数据ÿ…...

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据
微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列,以便知晓哪些列包含有价值的数据,…...

STM32HAL库USART源代码解析及应用
STM32HAL库USART源代码解析 前言STM32CubeIDE配置串口USART和UART的选择使用模式参数设置GPIO配置DMA配置中断配置硬件流控制使能生成代码解析和使用方法串口初始化__UART_HandleTypeDef结构体浅析HAL库代码实际使用方法使用轮询方式发送使用轮询方式接收使用中断方式发送使用中…...