7、Redis 队列与 Stream
引言
Redis 自 5.0 版本起引入了一种新的数据结构——Stream。这种数据结构不仅增加了 Redis 的数据处理能力,还使其在消息队列和数据流处理方面更具竞争力。Stream 提供了持久化、多播、消费组等功能,可以满足多种复杂的数据处理需求。
1. Redis Stream 概述
Stream 是 Redis 5.0 引入的一种新的数据结构,类似于 Kafka 的设计。它是一个强大的支持多播的可持久化消息队列,适用于实时数据处理和消息传递。
基本结构
Stream 由消息链表组成,每个消息都有一个唯一的 ID 和对应的内容。消息 ID 是由时间戳和序号组成的,例如 1527846880572-5,表示在时间戳 1527846880572 时生成的第 5 条消息。每个 Stream 在 Redis 中被存储为一个键值对,其中键是 Stream 的名称,值是消息链表。
持久化
Stream 支持持久化,这意味着即使 Redis 服务器重启,Stream 中的消息仍然会保留。这通过 Redis 的持久化机制 RDB(快照)和 AOF(日志)实现,确保数据的高可用性和可靠性。
多播支持
Stream 支持多个消费组(Consumer Group),每个消费组可以有多个消费者(Consumer)。消费组通过 XGROUP CREATE 命令创建,每个消费组都有一个独立的游标 last_delivered_id,记录已消费的最后一条消息。多个消费者之间是竞争关系,即同一个消费组内的多个消费者会争夺消息的处理权。
2. Stream 的基本操作命令
生产端命令
-
XADD:
XADD命令用于向 Stream 追加消息。可以指定消息的 ID,或使用*让 Redis 自动生成 ID。消息内容是键值对形式。 -
XDEL:
XDEL命令用于删除 Stream 中的消息。它并不会立即删除消息,而是标记为删除状态。 -
XRANGE:
XRANGE命令用于获取 Stream 中指定范围的消息。可以通过起始 ID 和结束 ID 指定范围,-表示最小值,+表示最大值。 -
XLEN:
XLEN命令用于获取 Stream 的长度,即消息数量。
消费端命令
-
XREAD:
XREAD命令用于读取消息,可以阻塞等待新消息,适用于单消费者模式。 -
XGROUP:
XGROUP命令用于管理消费组。可以创建、删除消费组,或删除消费组中的消费者。 -
XREADGROUP:
XREADGROUP命令用于读取消费组的消息。可以指定消费组和消费者,并阻塞等待新消息。 -
XACK:
XACK命令用于确认消息已被处理,防止消息重复处理。
3. Redis 队列几种实现总结
基于 List 的 LPUSH+BRPOP 实现
这种实现方式简单高效,通过 LPUSH 添加消息,BRPOP 阻塞读取消息。然而,需要处理空闲连接问题,因为长时间阻塞的连接会被服务器断开。此外,ACK 机制较为麻烦,无法保证消息处理成功。
基于 Sorted-Set 的实现
这种实现方式常用于延迟队列,通过 ZADD 添加带有分数的消息,ZRANGEBYSCORE 获取消息。然而,无法阻塞获取消息,需要轮询操作,效率较低。
PUB/SUB,订阅/发布模式
这种实现方式适合即时通讯,通过 PUBLISH 发布消息,SUBSCRIBE 订阅消息。优点是广播模式,一个消息可以同时发送给多个消费者。缺点是消息不可持久化,消费者离线期间的消息会丢失。
基于 Stream 类型的实现
这种实现方式类似消息中间件,适用于中小项目。通过 XADD 添加消息,XREAD 或 XREADGROUP 读取消息。Stream 支持消费组和持久化,具备较高的可靠性。然而,需要管理和监控,确保消息处理的顺利进行。
4. 消息队列问题
Stream 消息太多
当 Stream 中的消息过多时,可以通过 XADD 命令的 MAXLEN 参数设置 Stream 的最大长度,自动删除旧消息,防止内存占用过大。
消息未 ACK
如果消费者未及时确认消息,会导致 pending_ids 列表增长,占用内存。应尽快使用 XACK 命令确认消息,减少 pending_ids 列表的长度。
PEL 防丢失
如果客户端在读取消息后断开连接,未确认的消息会保留在 pending_ids 列表中。客户端重新连接后,可以继续读取这些消息,防止消息丢失。
死信问题
当某条消息长期未被处理时,可以通过 XPENDING 命令查看消息的处理状态,设定消息处理的临界值。达到临界值后,可以将该消息标记为死信(Dead Letter),通过 XDEL 命令删除。
Stream 高可用
Stream 的高可用性基于 Redis 的主从复制机制。通过 Sentinel 或 Cluster 集群环境,Stream 可以实现高可用。然而,在故障转移时,可能会丢失少量数据。
分区 Partition
Redis 服务器本身不支持原生分区功能。如果需要分区,可以在客户端实现,使用不同的 Stream 名称,将消息分片存储在不同的 Stream 中,实现水平扩展。
5. 使用场景
日志收集系统
在分布式系统中,日志收集是一个常见需求。可以使用 Redis Stream 实现一个高效的日志收集系统,将各个服务的日志通过 Stream 传递到日志处理服务,实现实时日志收集和分析。
实时监控系统
实时监控系统需要对大量实时数据进行采集和处理。可以使用 Redis Stream 构建一个高性能的实时监控系统。各个监控点将数据发送到 Stream,监控中心从 Stream 中读取数据进行处理和展示,实现对系统的实时监控。
消息队列服务
在分布式微服务架构中,各个服务之间的通信可以通过消息队列实现。Redis Stream 可以作为消息队列服务,实现服务之间的消息传递和处理,确保消息的可靠传递和高效处理。
6. 与其他消息队列的对比
与 Kafka 的对比
- 设计理念:Kafka 是一个分布式流处理平台,适用于大规模数据流处理和持久化;Stream 则更轻量,适合中小规模的消息队列和数据流处理。
- 持久化:Kafka 的消息持久化设计更为复杂和可靠,适用于高可用性要求高的场景;Stream 的持久化通过 RDB 和 AOF 实现,相对简单。
- 分区:Kafka 支持分区,可以水平扩展;Stream 不支持分区,需要通过客户端实现分片。
与 RabbitMQ 的对比
- 协议支持:RabbitMQ 支持多种协议(如 AMQP、MQTT),适用范围更广;Stream 仅支持 Redis 协议。
- 持久化:RabbitMQ 的持久化机制更为灵活和可靠;Stream 的持久化通过 Redis 实现,性能和可靠性略逊。
- 功能特性:RabbitMQ 提供丰富的消息路由、优先级队列等特性;Stream 则更为简单和高效,适合基本的消息队列需求。
7. 实践案例
日志收集系统
在一个分布式系统中,日志收集是一个常见需求。可以使用 Redis Stream 实现一个高效的日志收集系统,将各个服务的日志通过 Stream 传递到日志处理服务,实现实时日志收集和分析。
实时监控系统
实时监控系统需要对大量实时数据进行采集和处理,可以使用 Redis Stream 构建一个高性能的实时监控系统。各个监控点将数据发送到 Stream,监控中心从 Stream 中读取数据进行处理和展示,实现对系统的实时监控。
消息队列服务
在一个分布式微服务架构中,各个服务之间的通信可以通过消息队列实现。Redis Stream 可以作为消息队列服务,实现服务之间的消息传递和处理,确保消息的可靠传递和高效处理。
8. 结论
Redis Stream 是 Redis 5.0 引入的一种强大的数据结构,具备持久化、多播、消费组等特性,适用于日志收集、实时监控、消息队列等场景。Stream 提供了灵活的消息处理能力,可以满足复杂的实时数据处理需求。通过对 Stream 的详细介绍和
操作演示,可以看到 Stream 在复杂数据流处理和消息传递中的强大功能。相比其他消息队列,Stream 更为轻量和高效,适合中小规模的应用场景。希望本文能够帮助读者深入理解和应用 Redis Stream,为实际项目提供有力支持。
相关文章:
7、Redis 队列与 Stream
引言 Redis 自 5.0 版本起引入了一种新的数据结构——Stream。这种数据结构不仅增加了 Redis 的数据处理能力,还使其在消息队列和数据流处理方面更具竞争力。Stream 提供了持久化、多播、消费组等功能,可以满足多种复杂的数据处理需求。 1. Redis Stre…...
FFT剖析
快速傅里叶变换 (fast Fourier transform) xn{x0,x1,…xn-1} (num:N) 旋转因子系数: d2pik/N 旋转因子 wk(n)(cos(dn)isin(dn)) n[0,N-1] y(k) sum(x(n)wk(n),0,N-1) y(k){y(0),y(1),…y(N-1)} 傅里叶级数 x(n)wk(n)的级数是: 1.d2pik/N 这个系数决…...
git clone报错RPC failed; curl 92 HTTP/2 stream 7 was not closed cleanly
问题描述 git clone github上的项目报错: RPC failed; curl 92 HTTP/2 stream 7 was not closed cleanly: CANCEL (err 8) 4796 bytes of body are still expected fetch-pack: unexpected disconnect while reading sideband packet early EOF fetch-pack: invalid index-pac…...
Apispec,一个用于生成 OpenAPI(Swagger)规范的 Python 库
目录 01什么是 Apispec? 为什么选择 Apispec? 安装与配置 02Apispec 的基本用法 生成简单的 API 文档 1、创建 Apispec 实例 2、定义 API 路由和视图 3、添加路径到 Apispec 集成 Flask 和 Apispec 1、安装…...
SpringBoot 自定义异常返回数据格式
Spring Boot 默认异常处理 当我们用 spring boot 开发接口是,当遇到异常时返回的数据格式是如下形式的 {"timestamp": "2024-07-06T02:48:55.79100:00","status": 404,"error": "Not Found","path":…...
【xinference】(15):在compshare上,使用docker-compose运行xinference和chatgpt-web项目,配置成功!!!
视频演示 【xinference】(15):在compshare上,使用docker-compose运行xinference和chatgpt-web项目,配置成功!!! 1,安装docker方法: #!/bin/shdistribution$(…...
【Unity 3D角色移动】
【Unity 3D角色移动】 在Unity 3D中实现角色移动通常涉及到几个关键步骤,包括设置角色的物理属性、处理输入、更新角色的位置以及动画同步。下面是实现基本3D角色移动的步骤和示例代码: 步骤1:设置角色的物理属性 角色通常使用Character Co…...
个人视角,社会影响力:自媒体的魅力所在
随着数字化时代的到来,自媒体正成为信息传播领域的一场革命。个人视角与社会影响力的结合,赋予了自媒体独特的魅力。在传统媒体受限制的同时,自媒体为每个人提供了表达自己观点和思想的自由。个体的真实视角使得自媒体在信息传播中发挥着重要…...
算法训练营day70
题目1:108. 冗余连接 (kamacoder.com) #include<iostream> #include<vector>using namespace std;int n; vector<int> father(10001, 0);void init() {for(int i 1;i < n;i) father[i] i; }int find(int u) {return u father[u] ? u : fa…...
EtherCAT转Profinet网关配置说明第二讲:上位机软件配置
EtherCAT协议转Profinet协议网关模块(XD-ECPNS20),不仅可以实现数据之间的通信,还可以实现不同系统之间的数据共享。EtherCAT协议转Profinet协议网关模块(XD-ECPNS20)具有高速传输的特点,因此通…...
日志自动分析-Web---360星图GoaccessALBAnolog
目录 1、Web-360星图(IIS/Apache/Nginx) 2、Web-GoAccess (任何自定义日志格式字符串) 源码及使用手册 安装goaccess 使用 输出 3-Web-自写脚本(任何自定义日志格式字符串) 4、Web-机器语言analog(任何自定义日…...
【面试八股文】java基础知识
引言 本文是java面试时的一些常见知识点总结归纳和一些拓展,笔者在学习这些内容时,特地整理记录下来,以供大家学习共勉。 一、数据类型 1.1 为什么要设计封装类,Integer和int区别是什么? 使用封装类的目的 对象化:…...
ssrf结合redis未授权getshell
目录 漏洞介绍 SSRF Redis未授权 利用原理 环境搭建 利用过程 rockylinux cron计划任务反弹shell 写公钥免密登录 ubuntu 写公钥免密登录 漏洞介绍 SSRF SSRF(server side request forgrey)服务端请求伪造,因后端未过滤用户输入&…...
魔法自如:精通 IPython %automagic 命令的切换艺术
魔法自如:精通 IPython %automagic 命令的切换艺术 在 IPython 的神奇世界里,魔术命令是其强大交互功能的核心。这些以 % 或 %% 开头的命令,能够执行一系列特殊的操作,从而增强用户的编程体验。但是,你是否知道&#…...
基于CentOS Stream 9平台搭建MinIO以及开机自启
1. 官网 https://min.io/download?licenseagpl&platformlinux 1.1 下载二进制包 指定目录下载 cd /opt/coisini/ wget https://dl.min.io/server/minio/release/linux-amd64/minio1.2 文件赋权 chmod x /opt/coisini/minio1.3 创建Minio存储数据目录: mkdi…...
shell-awk语法整理
shell-awk语法整理 前言基本语法内置变量1. $02. NF3. NR4. FS5. RS6. OFS7. ORS8. FILENAME9. FNR10. ARGV11. ENVIRON12. IGNORECASE13. RSTART 和 RLENGTH示例解释 内置函数循环语句(后面的;可不加)条件语句高级特性示例 特殊模式BEGINEND组合示例BEG…...
关于忠诚:忠于自己的良知、理想、信念
关于忠诚: 当我们面对公司、上司、爱人、恋人、合作伙伴还是某件事,会纠结离开还是留下,这里我们要深知忠诚的定义,我们不是忠诚于某个人、某件事、或者某个机构,而是忠诚于自己的良知,忠诚于自己的理想和…...
探索Linux:开源世界的无限可能
Linux是一款开源操作系统,它的起源可以追溯到上世纪90年代初。这个故事始于一个名叫Linus Torvalds的芬兰大学生,他在1983年开始编写一个用于个人电脑的操作系统内核。在他的努力下,Linux逐渐发展成为一个稳定而强大的操作系统。 然而&#…...
深度学习之半监督学习:一文梳理目标检测中的半监督学习策略
什么是半监督目标检测? 传统机器学习根据训练数据集中的标注情况,有着不同的场景,主要包括:监督学习、弱监督学习、弱半监督学习、半监督学习。由于目标检测任务的特殊性,在介绍半监督目标检测方法之前,我…...
Hive 高可用分布式部署详细步骤
目录 系统版本说明 hive安装包下载及解压 上传mysql-connector-java的jar包 配置环境变量 进入conf配置文件中,将文件重命名 在hadoop集群上创建文件夹 创建本地目录 修改hive-site.xml文件 同步到其他的节点服务器 修改node02中的配置 hive-site.xml 修改…...
OpenClaw多模型切换:Qwen2.5-VL-7B与文本模型协同工作
OpenClaw多模型切换:Qwen2.5-VL-7B与文本模型协同工作 1. 为什么需要多模型协同 去年夏天,当我第一次尝试用OpenClaw自动化处理团队的知识库文档时,遇到了一个棘手的问题:有些文档包含大量截图和图表说明,而纯文本模…...
STM32远程固件升级(FOTA)实现方案详解
1. STM32远程升级方案概述在嵌入式设备开发中,远程固件升级(FOTA)是一项至关重要的功能。当设备部署在难以物理接触的场所时,通过无线或有线方式实现固件更新可以大幅降低维护成本。STM32系列单片机凭借其灵活的存储布局和丰富的通信接口,非常…...
2026最权威的十大降AI率平台实测分析
Ai论文网站排名(开题报告、文献综述、降aigc率、降重综合对比) TOP1. 千笔AI TOP2. aipasspaper TOP3. 清北论文 TOP4. 豆包 TOP5. kimi TOP6. deepseek 此刻,AI生成那种内容的检测变得越发严格起来,降AI工具就顺势产生了&a…...
利用drawio高效绘制数据库ER图:从入门到精通
1. 为什么选择drawio绘制数据库ER图 第一次接触数据库设计时,我被各种专业工具的价格和复杂度吓退了。直到发现drawio这个宝藏工具,才真正体会到什么叫"用最简单的工具做专业的事"。作为一款完全免费的跨平台工具,drawio不仅支持网…...
Unity WebGL小游戏上抖音,从踩坑到上线:一份避坑指南与性能优化清单
Unity WebGL小游戏上抖音:性能优化与避坑实战手册 当你第一次将Unity WebGL小游戏发布到抖音平台时,可能会遇到各种意想不到的性能瓶颈和兼容性问题。iOS设备上的内存限制、WebGL与Native的性能差距、包体大小控制等挑战,都可能让原本流畅的游…...
PrimeTime实战指南:从基础STA流程到精准时序报告解析
1. PrimeTime与静态时序分析基础 刚接触PrimeTime时,我和大多数工程师一样被满屏的时序报告搞得头晕眼花。直到把整个设计流程跑通三遍后,才真正理解这个工具的价值。PrimeTime(简称PT)是Synopsys推出的静态时序分析黄金工具&…...
电磁波衰减在气象雷达探测中的关键影响与优化策略
1. 电磁波衰减如何影响气象雷达的"视力" 想象一下你戴着沾满雨滴的眼镜看世界——视线模糊、细节丢失、距离判断失误。气象雷达遭遇电磁波衰减时,面临的正是类似的困境。当雷达发射的电磁波穿越雨雪云层时,能量就像被层层"抽成"&…...
直流电机双闭环调速控制系统仿真模型 转速电流双闭环PI控制 Matlab/Simulink仿真模型 带报告
直流电机双闭环调速控制系统仿真模型 转速电流双闭环PI控制 Matlab/Simulink仿真模型 带报告在 Simulink 里搭建直流电机双闭环调速系统,而是通过连接模块来实现。这段代码会自动计算 PI 控制器的参数,DC_Motor_Dual_Loop 的仿真模型。 🛠️ …...
一键生成专业工资条:工资条生成器功能详解
在当今数字化办公的时代,一款好的工具能够让工作效率得到质的提升。 工资条生成器就是这样一款专门为财务人员打造的专业工具,它集成了多项实用功能。 下面,就让我们来详细了解一下这款软件的各项功能特性。 首先要介绍的是软件的核心功能…...
Comate vs. Cursor:国产AI IDE如何以多智能体协同重塑开发体验?
1. Comate与Cursor:AI IDE赛道的双雄对决 当代码补全插件已经无法满足开发者的需求时,AI原生IDE正在掀起一场开发工具的革命。在这场变革中,百度的Comate和Cursor成为了最受关注的两个选手。作为一个长期使用各类开发工具的老码农,…...
