7、Redis 队列与 Stream
引言
Redis 自 5.0 版本起引入了一种新的数据结构——Stream。这种数据结构不仅增加了 Redis 的数据处理能力,还使其在消息队列和数据流处理方面更具竞争力。Stream 提供了持久化、多播、消费组等功能,可以满足多种复杂的数据处理需求。
1. Redis Stream 概述
Stream 是 Redis 5.0 引入的一种新的数据结构,类似于 Kafka 的设计。它是一个强大的支持多播的可持久化消息队列,适用于实时数据处理和消息传递。
基本结构
Stream 由消息链表组成,每个消息都有一个唯一的 ID 和对应的内容。消息 ID 是由时间戳和序号组成的,例如 1527846880572-5
,表示在时间戳 1527846880572
时生成的第 5 条消息。每个 Stream 在 Redis 中被存储为一个键值对,其中键是 Stream 的名称,值是消息链表。
持久化
Stream 支持持久化,这意味着即使 Redis 服务器重启,Stream 中的消息仍然会保留。这通过 Redis 的持久化机制 RDB(快照)和 AOF(日志)实现,确保数据的高可用性和可靠性。
多播支持
Stream 支持多个消费组(Consumer Group),每个消费组可以有多个消费者(Consumer)。消费组通过 XGROUP CREATE
命令创建,每个消费组都有一个独立的游标 last_delivered_id
,记录已消费的最后一条消息。多个消费者之间是竞争关系,即同一个消费组内的多个消费者会争夺消息的处理权。
2. Stream 的基本操作命令
生产端命令
-
XADD:
XADD
命令用于向 Stream 追加消息。可以指定消息的 ID,或使用*
让 Redis 自动生成 ID。消息内容是键值对形式。 -
XDEL:
XDEL
命令用于删除 Stream 中的消息。它并不会立即删除消息,而是标记为删除状态。 -
XRANGE:
XRANGE
命令用于获取 Stream 中指定范围的消息。可以通过起始 ID 和结束 ID 指定范围,-
表示最小值,+
表示最大值。 -
XLEN:
XLEN
命令用于获取 Stream 的长度,即消息数量。
消费端命令
-
XREAD:
XREAD
命令用于读取消息,可以阻塞等待新消息,适用于单消费者模式。 -
XGROUP:
XGROUP
命令用于管理消费组。可以创建、删除消费组,或删除消费组中的消费者。 -
XREADGROUP:
XREADGROUP
命令用于读取消费组的消息。可以指定消费组和消费者,并阻塞等待新消息。 -
XACK:
XACK
命令用于确认消息已被处理,防止消息重复处理。
3. Redis 队列几种实现总结
基于 List 的 LPUSH+BRPOP 实现
这种实现方式简单高效,通过 LPUSH
添加消息,BRPOP
阻塞读取消息。然而,需要处理空闲连接问题,因为长时间阻塞的连接会被服务器断开。此外,ACK 机制较为麻烦,无法保证消息处理成功。
基于 Sorted-Set 的实现
这种实现方式常用于延迟队列,通过 ZADD
添加带有分数的消息,ZRANGEBYSCORE
获取消息。然而,无法阻塞获取消息,需要轮询操作,效率较低。
PUB/SUB,订阅/发布模式
这种实现方式适合即时通讯,通过 PUBLISH
发布消息,SUBSCRIBE
订阅消息。优点是广播模式,一个消息可以同时发送给多个消费者。缺点是消息不可持久化,消费者离线期间的消息会丢失。
基于 Stream 类型的实现
这种实现方式类似消息中间件,适用于中小项目。通过 XADD
添加消息,XREAD
或 XREADGROUP
读取消息。Stream 支持消费组和持久化,具备较高的可靠性。然而,需要管理和监控,确保消息处理的顺利进行。
4. 消息队列问题
Stream 消息太多
当 Stream 中的消息过多时,可以通过 XADD
命令的 MAXLEN
参数设置 Stream 的最大长度,自动删除旧消息,防止内存占用过大。
消息未 ACK
如果消费者未及时确认消息,会导致 pending_ids
列表增长,占用内存。应尽快使用 XACK
命令确认消息,减少 pending_ids
列表的长度。
PEL 防丢失
如果客户端在读取消息后断开连接,未确认的消息会保留在 pending_ids
列表中。客户端重新连接后,可以继续读取这些消息,防止消息丢失。
死信问题
当某条消息长期未被处理时,可以通过 XPENDING
命令查看消息的处理状态,设定消息处理的临界值。达到临界值后,可以将该消息标记为死信(Dead Letter),通过 XDEL
命令删除。
Stream 高可用
Stream 的高可用性基于 Redis 的主从复制机制。通过 Sentinel 或 Cluster 集群环境,Stream 可以实现高可用。然而,在故障转移时,可能会丢失少量数据。
分区 Partition
Redis 服务器本身不支持原生分区功能。如果需要分区,可以在客户端实现,使用不同的 Stream 名称,将消息分片存储在不同的 Stream 中,实现水平扩展。
5. 使用场景
日志收集系统
在分布式系统中,日志收集是一个常见需求。可以使用 Redis Stream 实现一个高效的日志收集系统,将各个服务的日志通过 Stream 传递到日志处理服务,实现实时日志收集和分析。
实时监控系统
实时监控系统需要对大量实时数据进行采集和处理。可以使用 Redis Stream 构建一个高性能的实时监控系统。各个监控点将数据发送到 Stream,监控中心从 Stream 中读取数据进行处理和展示,实现对系统的实时监控。
消息队列服务
在分布式微服务架构中,各个服务之间的通信可以通过消息队列实现。Redis Stream 可以作为消息队列服务,实现服务之间的消息传递和处理,确保消息的可靠传递和高效处理。
6. 与其他消息队列的对比
与 Kafka 的对比
- 设计理念:Kafka 是一个分布式流处理平台,适用于大规模数据流处理和持久化;Stream 则更轻量,适合中小规模的消息队列和数据流处理。
- 持久化:Kafka 的消息持久化设计更为复杂和可靠,适用于高可用性要求高的场景;Stream 的持久化通过 RDB 和 AOF 实现,相对简单。
- 分区:Kafka 支持分区,可以水平扩展;Stream 不支持分区,需要通过客户端实现分片。
与 RabbitMQ 的对比
- 协议支持:RabbitMQ 支持多种协议(如 AMQP、MQTT),适用范围更广;Stream 仅支持 Redis 协议。
- 持久化:RabbitMQ 的持久化机制更为灵活和可靠;Stream 的持久化通过 Redis 实现,性能和可靠性略逊。
- 功能特性:RabbitMQ 提供丰富的消息路由、优先级队列等特性;Stream 则更为简单和高效,适合基本的消息队列需求。
7. 实践案例
日志收集系统
在一个分布式系统中,日志收集是一个常见需求。可以使用 Redis Stream 实现一个高效的日志收集系统,将各个服务的日志通过 Stream 传递到日志处理服务,实现实时日志收集和分析。
实时监控系统
实时监控系统需要对大量实时数据进行采集和处理,可以使用 Redis Stream 构建一个高性能的实时监控系统。各个监控点将数据发送到 Stream,监控中心从 Stream 中读取数据进行处理和展示,实现对系统的实时监控。
消息队列服务
在一个分布式微服务架构中,各个服务之间的通信可以通过消息队列实现。Redis Stream 可以作为消息队列服务,实现服务之间的消息传递和处理,确保消息的可靠传递和高效处理。
8. 结论
Redis Stream 是 Redis 5.0 引入的一种强大的数据结构,具备持久化、多播、消费组等特性,适用于日志收集、实时监控、消息队列等场景。Stream 提供了灵活的消息处理能力,可以满足复杂的实时数据处理需求。通过对 Stream 的详细介绍和
操作演示,可以看到 Stream 在复杂数据流处理和消息传递中的强大功能。相比其他消息队列,Stream 更为轻量和高效,适合中小规模的应用场景。希望本文能够帮助读者深入理解和应用 Redis Stream,为实际项目提供有力支持。
相关文章:

7、Redis 队列与 Stream
引言 Redis 自 5.0 版本起引入了一种新的数据结构——Stream。这种数据结构不仅增加了 Redis 的数据处理能力,还使其在消息队列和数据流处理方面更具竞争力。Stream 提供了持久化、多播、消费组等功能,可以满足多种复杂的数据处理需求。 1. Redis Stre…...

FFT剖析
快速傅里叶变换 (fast Fourier transform) xn{x0,x1,…xn-1} (num:N) 旋转因子系数: d2pik/N 旋转因子 wk(n)(cos(dn)isin(dn)) n[0,N-1] y(k) sum(x(n)wk(n),0,N-1) y(k){y(0),y(1),…y(N-1)} 傅里叶级数 x(n)wk(n)的级数是: 1.d2pik/N 这个系数决…...

git clone报错RPC failed; curl 92 HTTP/2 stream 7 was not closed cleanly
问题描述 git clone github上的项目报错: RPC failed; curl 92 HTTP/2 stream 7 was not closed cleanly: CANCEL (err 8) 4796 bytes of body are still expected fetch-pack: unexpected disconnect while reading sideband packet early EOF fetch-pack: invalid index-pac…...

Apispec,一个用于生成 OpenAPI(Swagger)规范的 Python 库
目录 01什么是 Apispec? 为什么选择 Apispec? 安装与配置 02Apispec 的基本用法 生成简单的 API 文档 1、创建 Apispec 实例 2、定义 API 路由和视图 3、添加路径到 Apispec 集成 Flask 和 Apispec 1、安装…...

SpringBoot 自定义异常返回数据格式
Spring Boot 默认异常处理 当我们用 spring boot 开发接口是,当遇到异常时返回的数据格式是如下形式的 {"timestamp": "2024-07-06T02:48:55.79100:00","status": 404,"error": "Not Found","path":…...

【xinference】(15):在compshare上,使用docker-compose运行xinference和chatgpt-web项目,配置成功!!!
视频演示 【xinference】(15):在compshare上,使用docker-compose运行xinference和chatgpt-web项目,配置成功!!! 1,安装docker方法: #!/bin/shdistribution$(…...

【Unity 3D角色移动】
【Unity 3D角色移动】 在Unity 3D中实现角色移动通常涉及到几个关键步骤,包括设置角色的物理属性、处理输入、更新角色的位置以及动画同步。下面是实现基本3D角色移动的步骤和示例代码: 步骤1:设置角色的物理属性 角色通常使用Character Co…...

个人视角,社会影响力:自媒体的魅力所在
随着数字化时代的到来,自媒体正成为信息传播领域的一场革命。个人视角与社会影响力的结合,赋予了自媒体独特的魅力。在传统媒体受限制的同时,自媒体为每个人提供了表达自己观点和思想的自由。个体的真实视角使得自媒体在信息传播中发挥着重要…...

算法训练营day70
题目1:108. 冗余连接 (kamacoder.com) #include<iostream> #include<vector>using namespace std;int n; vector<int> father(10001, 0);void init() {for(int i 1;i < n;i) father[i] i; }int find(int u) {return u father[u] ? u : fa…...

EtherCAT转Profinet网关配置说明第二讲:上位机软件配置
EtherCAT协议转Profinet协议网关模块(XD-ECPNS20),不仅可以实现数据之间的通信,还可以实现不同系统之间的数据共享。EtherCAT协议转Profinet协议网关模块(XD-ECPNS20)具有高速传输的特点,因此通…...

日志自动分析-Web---360星图GoaccessALBAnolog
目录 1、Web-360星图(IIS/Apache/Nginx) 2、Web-GoAccess (任何自定义日志格式字符串) 源码及使用手册 安装goaccess 使用 输出 3-Web-自写脚本(任何自定义日志格式字符串) 4、Web-机器语言analog(任何自定义日…...

【面试八股文】java基础知识
引言 本文是java面试时的一些常见知识点总结归纳和一些拓展,笔者在学习这些内容时,特地整理记录下来,以供大家学习共勉。 一、数据类型 1.1 为什么要设计封装类,Integer和int区别是什么? 使用封装类的目的 对象化:…...

ssrf结合redis未授权getshell
目录 漏洞介绍 SSRF Redis未授权 利用原理 环境搭建 利用过程 rockylinux cron计划任务反弹shell 写公钥免密登录 ubuntu 写公钥免密登录 漏洞介绍 SSRF SSRF(server side request forgrey)服务端请求伪造,因后端未过滤用户输入&…...

魔法自如:精通 IPython %automagic 命令的切换艺术
魔法自如:精通 IPython %automagic 命令的切换艺术 在 IPython 的神奇世界里,魔术命令是其强大交互功能的核心。这些以 % 或 %% 开头的命令,能够执行一系列特殊的操作,从而增强用户的编程体验。但是,你是否知道&#…...

基于CentOS Stream 9平台搭建MinIO以及开机自启
1. 官网 https://min.io/download?licenseagpl&platformlinux 1.1 下载二进制包 指定目录下载 cd /opt/coisini/ wget https://dl.min.io/server/minio/release/linux-amd64/minio1.2 文件赋权 chmod x /opt/coisini/minio1.3 创建Minio存储数据目录: mkdi…...

shell-awk语法整理
shell-awk语法整理 前言基本语法内置变量1. $02. NF3. NR4. FS5. RS6. OFS7. ORS8. FILENAME9. FNR10. ARGV11. ENVIRON12. IGNORECASE13. RSTART 和 RLENGTH示例解释 内置函数循环语句(后面的;可不加)条件语句高级特性示例 特殊模式BEGINEND组合示例BEG…...

关于忠诚:忠于自己的良知、理想、信念
关于忠诚: 当我们面对公司、上司、爱人、恋人、合作伙伴还是某件事,会纠结离开还是留下,这里我们要深知忠诚的定义,我们不是忠诚于某个人、某件事、或者某个机构,而是忠诚于自己的良知,忠诚于自己的理想和…...

探索Linux:开源世界的无限可能
Linux是一款开源操作系统,它的起源可以追溯到上世纪90年代初。这个故事始于一个名叫Linus Torvalds的芬兰大学生,他在1983年开始编写一个用于个人电脑的操作系统内核。在他的努力下,Linux逐渐发展成为一个稳定而强大的操作系统。 然而&#…...

深度学习之半监督学习:一文梳理目标检测中的半监督学习策略
什么是半监督目标检测? 传统机器学习根据训练数据集中的标注情况,有着不同的场景,主要包括:监督学习、弱监督学习、弱半监督学习、半监督学习。由于目标检测任务的特殊性,在介绍半监督目标检测方法之前,我…...

Hive 高可用分布式部署详细步骤
目录 系统版本说明 hive安装包下载及解压 上传mysql-connector-java的jar包 配置环境变量 进入conf配置文件中,将文件重命名 在hadoop集群上创建文件夹 创建本地目录 修改hive-site.xml文件 同步到其他的节点服务器 修改node02中的配置 hive-site.xml 修改…...

ubuntu下运行程序时提示缺库问题的有效解决方法
目录 一、问题现象二、解决方式三、总结 一、问题现象 当我们平时在ubuntu上运行一个程序时时长会遇到如下情况,含义为本机缺少执行程序需要的库 这时候我们可能会根据缺少的库使用apt install 库名的模糊名字 进行安装,然后再去运行,此时可…...

GNU/Linux - wic文件的使用
Yocto/OpenEmbedded使用的磁盘镜像格式是 wic。为嵌入式系统提供 bootable images。 The disk image format used in the Yocto Project is wic. .wic 文件显然只是一个带有分区表和分区的磁盘镜像,就像下载 Linux 发行版时获得的所有 .img 文件一样。这就是为什么你…...

前端JS 插件实现下载【js-tool-big-box,下载大文件(fetch请求 + 下载功能版)
上一节,我们添加了下载大文件的纯功能版,意思就是需要开发者,在自己项目里发送请求,请求成功后,获取文件流的blob数据,然后 js-tool-big-box 帮助下载。 但考虑到,有些项目,可能比较…...

JVM专题之垃圾收集器
JVM参数 3.1.1 标准参数 -version -help -server -cp 3.1.2 -X参数 非标准参数,也就是在JDK各个版本中可能会变动 ``` -Xint 解释执行 -Xcomp 第一次使用就编译成本地代码 -Xmixed 混合模式,JVM自己来决定 3.1.3 -XX参数 > 使用得最多的参数类型 > > 非…...

SSM养老院管理系统-计算机毕业设计源码02221
摘要 本篇论文旨在设计和实现一个基于SSM的养老院管理系统,旨在提供高效、便捷的养老院管理服务。该系统将包括老人档案信息管理、护工人员管理、房间信息管理、费用管理等功能模块,以满足养老院管理者和居民的不同需求。 通过引入SSM框架&#x…...

使用Keil将STM32部分程序放在RAM中运行
手动分配RAM区域,新建.sct文件,定义RAM_CODE区域,并指定其正确的起始地址和大小。 ; ************************************************************* ; *** Scatter-Loading Description File generated by uVision *** ; ************************************************…...

【MySQL8.0】 CentOS8.0下安装mysql报错权限问题的记录
这里写自定义目录标题 基本信息问题记录 基本信息 OS: Linux server-02 4.18.0-240.el8.x86_64 #1 SMP Fri Sep 25 19:48:47 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux MySQL: 8.0 问题记录 缺少类库 mysql: error while loading shared libraries: libncurses.so.5: cannot…...

在内网互通的服务器中自由跳转与数据管理
在服务器中自由跳转与数据管理:实用命令指南 在管理或使用集群服务器环境时,高效地在不同节点间跳转、执行命令以及数据的相互拷贝是日常操作的重要组成部分。 1. 在集群节点间自由跳转:SSH(Secure Shell) SSH 是实…...

Arcgis Api 三维聚合支持最新版API
Arcgis Api 三维聚合支持最新版API 最近有同学问我Arcgis api 三维聚合,官方还不支持三维聚合API,二维可以。所以依旧是通过GraphicLayers 类来实现,可支持最新Arcgis Api版本 效果图:...

在Conda环境中高效使用Kubernetes:跨平台容器化实践指南
摘要 Conda 是一个流行的跨平台包和环境管理器,广泛用于Python社区。而 Kubernetes 是一个开源的容器编排系统,用于自动化部署、扩展和管理容器化应用程序。本文将探讨如何在 Conda 环境中使用 Kubernetes,包括设置 Conda 环境、容器化应用程…...