当前位置: 首页 > news >正文

7、Redis 队列与 Stream

引言

Redis 自 5.0 版本起引入了一种新的数据结构——Stream。这种数据结构不仅增加了 Redis 的数据处理能力,还使其在消息队列和数据流处理方面更具竞争力。Stream 提供了持久化、多播、消费组等功能,可以满足多种复杂的数据处理需求。

1. Redis Stream 概述

Stream 是 Redis 5.0 引入的一种新的数据结构,类似于 Kafka 的设计。它是一个强大的支持多播的可持久化消息队列,适用于实时数据处理和消息传递。

基本结构

Stream 由消息链表组成,每个消息都有一个唯一的 ID 和对应的内容。消息 ID 是由时间戳和序号组成的,例如 1527846880572-5,表示在时间戳 1527846880572 时生成的第 5 条消息。每个 Stream 在 Redis 中被存储为一个键值对,其中键是 Stream 的名称,值是消息链表。

持久化

Stream 支持持久化,这意味着即使 Redis 服务器重启,Stream 中的消息仍然会保留。这通过 Redis 的持久化机制 RDB(快照)和 AOF(日志)实现,确保数据的高可用性和可靠性。

多播支持

Stream 支持多个消费组(Consumer Group),每个消费组可以有多个消费者(Consumer)。消费组通过 XGROUP CREATE 命令创建,每个消费组都有一个独立的游标 last_delivered_id,记录已消费的最后一条消息。多个消费者之间是竞争关系,即同一个消费组内的多个消费者会争夺消息的处理权。

2. Stream 的基本操作命令

生产端命令

  1. XADD:XADD 命令用于向 Stream 追加消息。可以指定消息的 ID,或使用 * 让 Redis 自动生成 ID。消息内容是键值对形式。

  2. XDEL:XDEL 命令用于删除 Stream 中的消息。它并不会立即删除消息,而是标记为删除状态。

  3. XRANGE:XRANGE 命令用于获取 Stream 中指定范围的消息。可以通过起始 ID 和结束 ID 指定范围,- 表示最小值,+ 表示最大值。

  4. XLEN:XLEN 命令用于获取 Stream 的长度,即消息数量。

消费端命令

  1. XREAD:XREAD 命令用于读取消息,可以阻塞等待新消息,适用于单消费者模式。

  2. XGROUP:XGROUP 命令用于管理消费组。可以创建、删除消费组,或删除消费组中的消费者。

  3. XREADGROUP:XREADGROUP 命令用于读取消费组的消息。可以指定消费组和消费者,并阻塞等待新消息。

  4. XACK:XACK 命令用于确认消息已被处理,防止消息重复处理。

3. Redis 队列几种实现总结

基于 List 的 LPUSH+BRPOP 实现

这种实现方式简单高效,通过 LPUSH 添加消息,BRPOP 阻塞读取消息。然而,需要处理空闲连接问题,因为长时间阻塞的连接会被服务器断开。此外,ACK 机制较为麻烦,无法保证消息处理成功。

基于 Sorted-Set 的实现

这种实现方式常用于延迟队列,通过 ZADD 添加带有分数的消息,ZRANGEBYSCORE 获取消息。然而,无法阻塞获取消息,需要轮询操作,效率较低。

PUB/SUB,订阅/发布模式

这种实现方式适合即时通讯,通过 PUBLISH 发布消息,SUBSCRIBE 订阅消息。优点是广播模式,一个消息可以同时发送给多个消费者。缺点是消息不可持久化,消费者离线期间的消息会丢失。

基于 Stream 类型的实现

这种实现方式类似消息中间件,适用于中小项目。通过 XADD 添加消息,XREAD 或 XREADGROUP 读取消息。Stream 支持消费组和持久化,具备较高的可靠性。然而,需要管理和监控,确保消息处理的顺利进行。

4. 消息队列问题

Stream 消息太多

当 Stream 中的消息过多时,可以通过 XADD 命令的 MAXLEN 参数设置 Stream 的最大长度,自动删除旧消息,防止内存占用过大。

消息未 ACK

如果消费者未及时确认消息,会导致 pending_ids 列表增长,占用内存。应尽快使用 XACK 命令确认消息,减少 pending_ids 列表的长度。

PEL 防丢失

如果客户端在读取消息后断开连接,未确认的消息会保留在 pending_ids 列表中。客户端重新连接后,可以继续读取这些消息,防止消息丢失。

死信问题

当某条消息长期未被处理时,可以通过 XPENDING 命令查看消息的处理状态,设定消息处理的临界值。达到临界值后,可以将该消息标记为死信(Dead Letter),通过 XDEL 命令删除。

Stream 高可用

Stream 的高可用性基于 Redis 的主从复制机制。通过 Sentinel 或 Cluster 集群环境,Stream 可以实现高可用。然而,在故障转移时,可能会丢失少量数据。

分区 Partition

Redis 服务器本身不支持原生分区功能。如果需要分区,可以在客户端实现,使用不同的 Stream 名称,将消息分片存储在不同的 Stream 中,实现水平扩展。

5. 使用场景

日志收集系统

在分布式系统中,日志收集是一个常见需求。可以使用 Redis Stream 实现一个高效的日志收集系统,将各个服务的日志通过 Stream 传递到日志处理服务,实现实时日志收集和分析。

实时监控系统

实时监控系统需要对大量实时数据进行采集和处理。可以使用 Redis Stream 构建一个高性能的实时监控系统。各个监控点将数据发送到 Stream,监控中心从 Stream 中读取数据进行处理和展示,实现对系统的实时监控。

消息队列服务

在分布式微服务架构中,各个服务之间的通信可以通过消息队列实现。Redis Stream 可以作为消息队列服务,实现服务之间的消息传递和处理,确保消息的可靠传递和高效处理。

6. 与其他消息队列的对比

与 Kafka 的对比

  • 设计理念:Kafka 是一个分布式流处理平台,适用于大规模数据流处理和持久化;Stream 则更轻量,适合中小规模的消息队列和数据流处理。
  • 持久化:Kafka 的消息持久化设计更为复杂和可靠,适用于高可用性要求高的场景;Stream 的持久化通过 RDB 和 AOF 实现,相对简单。
  • 分区:Kafka 支持分区,可以水平扩展;Stream 不支持分区,需要通过客户端实现分片。

与 RabbitMQ 的对比

  • 协议支持:RabbitMQ 支持多种协议(如 AMQP、MQTT),适用范围更广;Stream 仅支持 Redis 协议。
  • 持久化:RabbitMQ 的持久化机制更为灵活和可靠;Stream 的持久化通过 Redis 实现,性能和可靠性略逊。
  • 功能特性:RabbitMQ 提供丰富的消息路由、优先级队列等特性;Stream 则更为简单和高效,适合基本的消息队列需求。
7. 实践案例

日志收集系统

在一个分布式系统中,日志收集是一个常见需求。可以使用 Redis Stream 实现一个高效的日志收集系统,将各个服务的日志通过 Stream 传递到日志处理服务,实现实时日志收集和分析。

实时监控系统

实时监控系统需要对大量实时数据进行采集和处理,可以使用 Redis Stream 构建一个高性能的实时监控系统。各个监控点将数据发送到 Stream,监控中心从 Stream 中读取数据进行处理和展示,实现对系统的实时监控。

消息队列服务

在一个分布式微服务架构中,各个服务之间的通信可以通过消息队列实现。Redis Stream 可以作为消息队列服务,实现服务之间的消息传递和处理,确保消息的可靠传递和高效处理。

8. 结论

Redis Stream 是 Redis 5.0 引入的一种强大的数据结构,具备持久化、多播、消费组等特性,适用于日志收集、实时监控、消息队列等场景。Stream 提供了灵活的消息处理能力,可以满足复杂的实时数据处理需求。通过对 Stream 的详细介绍和

操作演示,可以看到 Stream 在复杂数据流处理和消息传递中的强大功能。相比其他消息队列,Stream 更为轻量和高效,适合中小规模的应用场景。希望本文能够帮助读者深入理解和应用 Redis Stream,为实际项目提供有力支持。

相关文章:

7、Redis 队列与 Stream

引言 Redis 自 5.0 版本起引入了一种新的数据结构——Stream。这种数据结构不仅增加了 Redis 的数据处理能力,还使其在消息队列和数据流处理方面更具竞争力。Stream 提供了持久化、多播、消费组等功能,可以满足多种复杂的数据处理需求。 1. Redis Stre…...

FFT剖析

快速傅里叶变换 (fast Fourier transform) xn{x0,x1,…xn-1} (num:N) 旋转因子系数: d2pik/N 旋转因子 wk(n)(cos(dn)isin(dn)) n[0,N-1] y(k) sum(x(n)wk(n),0,N-1) y(k){y(0),y(1),…y(N-1)} 傅里叶级数 x(n)wk(n)的级数是: 1.d2pik/N 这个系数决…...

git clone报错RPC failed; curl 92 HTTP/2 stream 7 was not closed cleanly

问题描述 git clone github上的项目报错: RPC failed; curl 92 HTTP/2 stream 7 was not closed cleanly: CANCEL (err 8) 4796 bytes of body are still expected fetch-pack: unexpected disconnect while reading sideband packet early EOF fetch-pack: invalid index-pac…...

Apispec,一个用于生成 OpenAPI(Swagger)规范的 Python 库

目录 01什么是 Apispec? 为什么选择 Apispec? 安装与配置 02Apispec 的基本用法 生成简单的 API 文档 1、创建 Apispec 实例 2、定义 API 路由和视图 3、添加路径到 Apispec 集成 Flask 和 Apispec 1、安装…...

SpringBoot 自定义异常返回数据格式

Spring Boot 默认异常处理 当我们用 spring boot 开发接口是,当遇到异常时返回的数据格式是如下形式的 {"timestamp": "2024-07-06T02:48:55.79100:00","status": 404,"error": "Not Found","path":…...

【xinference】(15):在compshare上,使用docker-compose运行xinference和chatgpt-web项目,配置成功!!!

视频演示 【xinference】(15):在compshare上,使用docker-compose运行xinference和chatgpt-web项目,配置成功!!! 1,安装docker方法: #!/bin/shdistribution$(…...

【Unity 3D角色移动】

【Unity 3D角色移动】 在Unity 3D中实现角色移动通常涉及到几个关键步骤,包括设置角色的物理属性、处理输入、更新角色的位置以及动画同步。下面是实现基本3D角色移动的步骤和示例代码: 步骤1:设置角色的物理属性 角色通常使用Character Co…...

个人视角,社会影响力:自媒体的魅力所在

随着数字化时代的到来,自媒体正成为信息传播领域的一场革命。个人视角与社会影响力的结合,赋予了自媒体独特的魅力。在传统媒体受限制的同时,自媒体为每个人提供了表达自己观点和思想的自由。个体的真实视角使得自媒体在信息传播中发挥着重要…...

算法训练营day70

题目1&#xff1a;108. 冗余连接 (kamacoder.com) #include<iostream> #include<vector>using namespace std;int n; vector<int> father(10001, 0);void init() {for(int i 1;i < n;i) father[i] i; }int find(int u) {return u father[u] ? u : fa…...

EtherCAT转Profinet网关配置说明第二讲:上位机软件配置

EtherCAT协议转Profinet协议网关模块&#xff08;XD-ECPNS20&#xff09;&#xff0c;不仅可以实现数据之间的通信&#xff0c;还可以实现不同系统之间的数据共享。EtherCAT协议转Profinet协议网关模块&#xff08;XD-ECPNS20&#xff09;具有高速传输的特点&#xff0c;因此通…...

日志自动分析-Web---360星图GoaccessALBAnolog

目录 1、Web-360星图(IIS/Apache/Nginx) 2、Web-GoAccess &#xff08;任何自定义日志格式字符串&#xff09; 源码及使用手册 安装goaccess 使用 输出 3-Web-自写脚本&#xff08;任何自定义日志格式字符串&#xff09; 4、Web-机器语言analog&#xff08;任何自定义日…...

【面试八股文】java基础知识

引言 本文是java面试时的一些常见知识点总结归纳和一些拓展&#xff0c;笔者在学习这些内容时&#xff0c;特地整理记录下来&#xff0c;以供大家学习共勉。 一、数据类型 1.1 为什么要设计封装类&#xff0c;Integer和int区别是什么&#xff1f; 使用封装类的目的 对象化:…...

ssrf结合redis未授权getshell

目录 漏洞介绍 SSRF Redis未授权 利用原理 环境搭建 利用过程 rockylinux cron计划任务反弹shell 写公钥免密登录 ubuntu 写公钥免密登录 漏洞介绍 SSRF SSRF&#xff08;server side request forgrey&#xff09;服务端请求伪造&#xff0c;因后端未过滤用户输入&…...

魔法自如:精通 IPython %automagic 命令的切换艺术

魔法自如&#xff1a;精通 IPython %automagic 命令的切换艺术 在 IPython 的神奇世界里&#xff0c;魔术命令是其强大交互功能的核心。这些以 % 或 %% 开头的命令&#xff0c;能够执行一系列特殊的操作&#xff0c;从而增强用户的编程体验。但是&#xff0c;你是否知道&#…...

基于CentOS Stream 9平台搭建MinIO以及开机自启

1. 官网 https://min.io/download?licenseagpl&platformlinux 1.1 下载二进制包 指定目录下载 cd /opt/coisini/ wget https://dl.min.io/server/minio/release/linux-amd64/minio1.2 文件赋权 chmod x /opt/coisini/minio1.3 创建Minio存储数据目录&#xff1a; mkdi…...

shell-awk语法整理

shell-awk语法整理 前言基本语法内置变量1. $02. NF3. NR4. FS5. RS6. OFS7. ORS8. FILENAME9. FNR10. ARGV11. ENVIRON12. IGNORECASE13. RSTART 和 RLENGTH示例解释 内置函数循环语句&#xff08;后面的;可不加&#xff09;条件语句高级特性示例 特殊模式BEGINEND组合示例BEG…...

关于忠诚:忠于自己的良知、理想、信念

关于忠诚&#xff1a; 当我们面对公司、上司、爱人、恋人、合作伙伴还是某件事&#xff0c;会纠结离开还是留下&#xff0c;这里我们要深知忠诚的定义&#xff0c;我们不是忠诚于某个人、某件事、或者某个机构&#xff0c;而是忠诚于自己的良知&#xff0c;忠诚于自己的理想和…...

探索Linux:开源世界的无限可能

Linux是一款开源操作系统&#xff0c;它的起源可以追溯到上世纪90年代初。这个故事始于一个名叫Linus Torvalds的芬兰大学生&#xff0c;他在1983年开始编写一个用于个人电脑的操作系统内核。在他的努力下&#xff0c;Linux逐渐发展成为一个稳定而强大的操作系统。 然而&#…...

深度学习之半监督学习:一文梳理目标检测中的半监督学习策略

什么是半监督目标检测&#xff1f; 传统机器学习根据训练数据集中的标注情况&#xff0c;有着不同的场景&#xff0c;主要包括&#xff1a;监督学习、弱监督学习、弱半监督学习、半监督学习。由于目标检测任务的特殊性&#xff0c;在介绍半监督目标检测方法之前&#xff0c;我…...

Hive 高可用分布式部署详细步骤

目录 系统版本说明 hive安装包下载及解压 上传mysql-connector-java的jar包 配置环境变量 进入conf配置文件中&#xff0c;将文件重命名 在hadoop集群上创建文件夹 创建本地目录 修改hive-site.xml文件 同步到其他的节点服务器 修改node02中的配置 hive-site.xml 修改…...

uniapp 对接腾讯云IM群组成员管理(增删改查)

UniApp 实战&#xff1a;腾讯云IM群组成员管理&#xff08;增删改查&#xff09; 一、前言 在社交类App开发中&#xff0c;群组成员管理是核心功能之一。本文将基于UniApp框架&#xff0c;结合腾讯云IM SDK&#xff0c;详细讲解如何实现群组成员的增删改查全流程。 权限校验…...

【Python】 -- 趣味代码 - 小恐龙游戏

文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂

蛋白质结合剂&#xff08;如抗体、抑制肽&#xff09;在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上&#xff0c;高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术&#xff0c;但这类方法普遍面临资源消耗巨大、研发周期冗长…...

Python爬虫实战:研究feedparser库相关技术

1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...

Golang dig框架与GraphQL的完美结合

将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用&#xff0c;可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器&#xff0c;能够帮助开发者更好地管理复杂的依赖关系&#xff0c;而 GraphQL 则是一种用于 API 的查询语言&#xff0c;能够提…...

基于当前项目通过npm包形式暴露公共组件

1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹&#xff0c;并新增内容 3.创建package文件夹...

Psychopy音频的使用

Psychopy音频的使用 本文主要解决以下问题&#xff1a; 指定音频引擎与设备&#xff1b;播放音频文件 本文所使用的环境&#xff1a; Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...

ardupilot 开发环境eclipse 中import 缺少C++

目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...

用docker来安装部署freeswitch记录

今天刚才测试一个callcenter的项目&#xff0c;所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...

Spring数据访问模块设计

前面我们已经完成了IoC和web模块的设计&#xff0c;聪明的码友立马就知道了&#xff0c;该到数据访问模块了&#xff0c;要不就这俩玩个6啊&#xff0c;查库势在必行&#xff0c;至此&#xff0c;它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据&#xff08;数据库、No…...