当前位置: 首页 > news >正文

7、Redis 队列与 Stream

引言

Redis 自 5.0 版本起引入了一种新的数据结构——Stream。这种数据结构不仅增加了 Redis 的数据处理能力,还使其在消息队列和数据流处理方面更具竞争力。Stream 提供了持久化、多播、消费组等功能,可以满足多种复杂的数据处理需求。

1. Redis Stream 概述

Stream 是 Redis 5.0 引入的一种新的数据结构,类似于 Kafka 的设计。它是一个强大的支持多播的可持久化消息队列,适用于实时数据处理和消息传递。

基本结构

Stream 由消息链表组成,每个消息都有一个唯一的 ID 和对应的内容。消息 ID 是由时间戳和序号组成的,例如 1527846880572-5,表示在时间戳 1527846880572 时生成的第 5 条消息。每个 Stream 在 Redis 中被存储为一个键值对,其中键是 Stream 的名称,值是消息链表。

持久化

Stream 支持持久化,这意味着即使 Redis 服务器重启,Stream 中的消息仍然会保留。这通过 Redis 的持久化机制 RDB(快照)和 AOF(日志)实现,确保数据的高可用性和可靠性。

多播支持

Stream 支持多个消费组(Consumer Group),每个消费组可以有多个消费者(Consumer)。消费组通过 XGROUP CREATE 命令创建,每个消费组都有一个独立的游标 last_delivered_id,记录已消费的最后一条消息。多个消费者之间是竞争关系,即同一个消费组内的多个消费者会争夺消息的处理权。

2. Stream 的基本操作命令

生产端命令

  1. XADD:XADD 命令用于向 Stream 追加消息。可以指定消息的 ID,或使用 * 让 Redis 自动生成 ID。消息内容是键值对形式。

  2. XDEL:XDEL 命令用于删除 Stream 中的消息。它并不会立即删除消息,而是标记为删除状态。

  3. XRANGE:XRANGE 命令用于获取 Stream 中指定范围的消息。可以通过起始 ID 和结束 ID 指定范围,- 表示最小值,+ 表示最大值。

  4. XLEN:XLEN 命令用于获取 Stream 的长度,即消息数量。

消费端命令

  1. XREAD:XREAD 命令用于读取消息,可以阻塞等待新消息,适用于单消费者模式。

  2. XGROUP:XGROUP 命令用于管理消费组。可以创建、删除消费组,或删除消费组中的消费者。

  3. XREADGROUP:XREADGROUP 命令用于读取消费组的消息。可以指定消费组和消费者,并阻塞等待新消息。

  4. XACK:XACK 命令用于确认消息已被处理,防止消息重复处理。

3. Redis 队列几种实现总结

基于 List 的 LPUSH+BRPOP 实现

这种实现方式简单高效,通过 LPUSH 添加消息,BRPOP 阻塞读取消息。然而,需要处理空闲连接问题,因为长时间阻塞的连接会被服务器断开。此外,ACK 机制较为麻烦,无法保证消息处理成功。

基于 Sorted-Set 的实现

这种实现方式常用于延迟队列,通过 ZADD 添加带有分数的消息,ZRANGEBYSCORE 获取消息。然而,无法阻塞获取消息,需要轮询操作,效率较低。

PUB/SUB,订阅/发布模式

这种实现方式适合即时通讯,通过 PUBLISH 发布消息,SUBSCRIBE 订阅消息。优点是广播模式,一个消息可以同时发送给多个消费者。缺点是消息不可持久化,消费者离线期间的消息会丢失。

基于 Stream 类型的实现

这种实现方式类似消息中间件,适用于中小项目。通过 XADD 添加消息,XREAD 或 XREADGROUP 读取消息。Stream 支持消费组和持久化,具备较高的可靠性。然而,需要管理和监控,确保消息处理的顺利进行。

4. 消息队列问题

Stream 消息太多

当 Stream 中的消息过多时,可以通过 XADD 命令的 MAXLEN 参数设置 Stream 的最大长度,自动删除旧消息,防止内存占用过大。

消息未 ACK

如果消费者未及时确认消息,会导致 pending_ids 列表增长,占用内存。应尽快使用 XACK 命令确认消息,减少 pending_ids 列表的长度。

PEL 防丢失

如果客户端在读取消息后断开连接,未确认的消息会保留在 pending_ids 列表中。客户端重新连接后,可以继续读取这些消息,防止消息丢失。

死信问题

当某条消息长期未被处理时,可以通过 XPENDING 命令查看消息的处理状态,设定消息处理的临界值。达到临界值后,可以将该消息标记为死信(Dead Letter),通过 XDEL 命令删除。

Stream 高可用

Stream 的高可用性基于 Redis 的主从复制机制。通过 Sentinel 或 Cluster 集群环境,Stream 可以实现高可用。然而,在故障转移时,可能会丢失少量数据。

分区 Partition

Redis 服务器本身不支持原生分区功能。如果需要分区,可以在客户端实现,使用不同的 Stream 名称,将消息分片存储在不同的 Stream 中,实现水平扩展。

5. 使用场景

日志收集系统

在分布式系统中,日志收集是一个常见需求。可以使用 Redis Stream 实现一个高效的日志收集系统,将各个服务的日志通过 Stream 传递到日志处理服务,实现实时日志收集和分析。

实时监控系统

实时监控系统需要对大量实时数据进行采集和处理。可以使用 Redis Stream 构建一个高性能的实时监控系统。各个监控点将数据发送到 Stream,监控中心从 Stream 中读取数据进行处理和展示,实现对系统的实时监控。

消息队列服务

在分布式微服务架构中,各个服务之间的通信可以通过消息队列实现。Redis Stream 可以作为消息队列服务,实现服务之间的消息传递和处理,确保消息的可靠传递和高效处理。

6. 与其他消息队列的对比

与 Kafka 的对比

  • 设计理念:Kafka 是一个分布式流处理平台,适用于大规模数据流处理和持久化;Stream 则更轻量,适合中小规模的消息队列和数据流处理。
  • 持久化:Kafka 的消息持久化设计更为复杂和可靠,适用于高可用性要求高的场景;Stream 的持久化通过 RDB 和 AOF 实现,相对简单。
  • 分区:Kafka 支持分区,可以水平扩展;Stream 不支持分区,需要通过客户端实现分片。

与 RabbitMQ 的对比

  • 协议支持:RabbitMQ 支持多种协议(如 AMQP、MQTT),适用范围更广;Stream 仅支持 Redis 协议。
  • 持久化:RabbitMQ 的持久化机制更为灵活和可靠;Stream 的持久化通过 Redis 实现,性能和可靠性略逊。
  • 功能特性:RabbitMQ 提供丰富的消息路由、优先级队列等特性;Stream 则更为简单和高效,适合基本的消息队列需求。
7. 实践案例

日志收集系统

在一个分布式系统中,日志收集是一个常见需求。可以使用 Redis Stream 实现一个高效的日志收集系统,将各个服务的日志通过 Stream 传递到日志处理服务,实现实时日志收集和分析。

实时监控系统

实时监控系统需要对大量实时数据进行采集和处理,可以使用 Redis Stream 构建一个高性能的实时监控系统。各个监控点将数据发送到 Stream,监控中心从 Stream 中读取数据进行处理和展示,实现对系统的实时监控。

消息队列服务

在一个分布式微服务架构中,各个服务之间的通信可以通过消息队列实现。Redis Stream 可以作为消息队列服务,实现服务之间的消息传递和处理,确保消息的可靠传递和高效处理。

8. 结论

Redis Stream 是 Redis 5.0 引入的一种强大的数据结构,具备持久化、多播、消费组等特性,适用于日志收集、实时监控、消息队列等场景。Stream 提供了灵活的消息处理能力,可以满足复杂的实时数据处理需求。通过对 Stream 的详细介绍和

操作演示,可以看到 Stream 在复杂数据流处理和消息传递中的强大功能。相比其他消息队列,Stream 更为轻量和高效,适合中小规模的应用场景。希望本文能够帮助读者深入理解和应用 Redis Stream,为实际项目提供有力支持。

相关文章:

7、Redis 队列与 Stream

引言 Redis 自 5.0 版本起引入了一种新的数据结构——Stream。这种数据结构不仅增加了 Redis 的数据处理能力,还使其在消息队列和数据流处理方面更具竞争力。Stream 提供了持久化、多播、消费组等功能,可以满足多种复杂的数据处理需求。 1. Redis Stre…...

FFT剖析

快速傅里叶变换 (fast Fourier transform) xn{x0,x1,…xn-1} (num:N) 旋转因子系数: d2pik/N 旋转因子 wk(n)(cos(dn)isin(dn)) n[0,N-1] y(k) sum(x(n)wk(n),0,N-1) y(k){y(0),y(1),…y(N-1)} 傅里叶级数 x(n)wk(n)的级数是: 1.d2pik/N 这个系数决…...

git clone报错RPC failed; curl 92 HTTP/2 stream 7 was not closed cleanly

问题描述 git clone github上的项目报错: RPC failed; curl 92 HTTP/2 stream 7 was not closed cleanly: CANCEL (err 8) 4796 bytes of body are still expected fetch-pack: unexpected disconnect while reading sideband packet early EOF fetch-pack: invalid index-pac…...

Apispec,一个用于生成 OpenAPI(Swagger)规范的 Python 库

目录 01什么是 Apispec? 为什么选择 Apispec? 安装与配置 02Apispec 的基本用法 生成简单的 API 文档 1、创建 Apispec 实例 2、定义 API 路由和视图 3、添加路径到 Apispec 集成 Flask 和 Apispec 1、安装…...

SpringBoot 自定义异常返回数据格式

Spring Boot 默认异常处理 当我们用 spring boot 开发接口是,当遇到异常时返回的数据格式是如下形式的 {"timestamp": "2024-07-06T02:48:55.79100:00","status": 404,"error": "Not Found","path":…...

【xinference】(15):在compshare上,使用docker-compose运行xinference和chatgpt-web项目,配置成功!!!

视频演示 【xinference】(15):在compshare上,使用docker-compose运行xinference和chatgpt-web项目,配置成功!!! 1,安装docker方法: #!/bin/shdistribution$(…...

【Unity 3D角色移动】

【Unity 3D角色移动】 在Unity 3D中实现角色移动通常涉及到几个关键步骤,包括设置角色的物理属性、处理输入、更新角色的位置以及动画同步。下面是实现基本3D角色移动的步骤和示例代码: 步骤1:设置角色的物理属性 角色通常使用Character Co…...

个人视角,社会影响力:自媒体的魅力所在

随着数字化时代的到来,自媒体正成为信息传播领域的一场革命。个人视角与社会影响力的结合,赋予了自媒体独特的魅力。在传统媒体受限制的同时,自媒体为每个人提供了表达自己观点和思想的自由。个体的真实视角使得自媒体在信息传播中发挥着重要…...

算法训练营day70

题目1&#xff1a;108. 冗余连接 (kamacoder.com) #include<iostream> #include<vector>using namespace std;int n; vector<int> father(10001, 0);void init() {for(int i 1;i < n;i) father[i] i; }int find(int u) {return u father[u] ? u : fa…...

EtherCAT转Profinet网关配置说明第二讲:上位机软件配置

EtherCAT协议转Profinet协议网关模块&#xff08;XD-ECPNS20&#xff09;&#xff0c;不仅可以实现数据之间的通信&#xff0c;还可以实现不同系统之间的数据共享。EtherCAT协议转Profinet协议网关模块&#xff08;XD-ECPNS20&#xff09;具有高速传输的特点&#xff0c;因此通…...

日志自动分析-Web---360星图GoaccessALBAnolog

目录 1、Web-360星图(IIS/Apache/Nginx) 2、Web-GoAccess &#xff08;任何自定义日志格式字符串&#xff09; 源码及使用手册 安装goaccess 使用 输出 3-Web-自写脚本&#xff08;任何自定义日志格式字符串&#xff09; 4、Web-机器语言analog&#xff08;任何自定义日…...

【面试八股文】java基础知识

引言 本文是java面试时的一些常见知识点总结归纳和一些拓展&#xff0c;笔者在学习这些内容时&#xff0c;特地整理记录下来&#xff0c;以供大家学习共勉。 一、数据类型 1.1 为什么要设计封装类&#xff0c;Integer和int区别是什么&#xff1f; 使用封装类的目的 对象化:…...

ssrf结合redis未授权getshell

目录 漏洞介绍 SSRF Redis未授权 利用原理 环境搭建 利用过程 rockylinux cron计划任务反弹shell 写公钥免密登录 ubuntu 写公钥免密登录 漏洞介绍 SSRF SSRF&#xff08;server side request forgrey&#xff09;服务端请求伪造&#xff0c;因后端未过滤用户输入&…...

魔法自如:精通 IPython %automagic 命令的切换艺术

魔法自如&#xff1a;精通 IPython %automagic 命令的切换艺术 在 IPython 的神奇世界里&#xff0c;魔术命令是其强大交互功能的核心。这些以 % 或 %% 开头的命令&#xff0c;能够执行一系列特殊的操作&#xff0c;从而增强用户的编程体验。但是&#xff0c;你是否知道&#…...

基于CentOS Stream 9平台搭建MinIO以及开机自启

1. 官网 https://min.io/download?licenseagpl&platformlinux 1.1 下载二进制包 指定目录下载 cd /opt/coisini/ wget https://dl.min.io/server/minio/release/linux-amd64/minio1.2 文件赋权 chmod x /opt/coisini/minio1.3 创建Minio存储数据目录&#xff1a; mkdi…...

shell-awk语法整理

shell-awk语法整理 前言基本语法内置变量1. $02. NF3. NR4. FS5. RS6. OFS7. ORS8. FILENAME9. FNR10. ARGV11. ENVIRON12. IGNORECASE13. RSTART 和 RLENGTH示例解释 内置函数循环语句&#xff08;后面的;可不加&#xff09;条件语句高级特性示例 特殊模式BEGINEND组合示例BEG…...

关于忠诚:忠于自己的良知、理想、信念

关于忠诚&#xff1a; 当我们面对公司、上司、爱人、恋人、合作伙伴还是某件事&#xff0c;会纠结离开还是留下&#xff0c;这里我们要深知忠诚的定义&#xff0c;我们不是忠诚于某个人、某件事、或者某个机构&#xff0c;而是忠诚于自己的良知&#xff0c;忠诚于自己的理想和…...

探索Linux:开源世界的无限可能

Linux是一款开源操作系统&#xff0c;它的起源可以追溯到上世纪90年代初。这个故事始于一个名叫Linus Torvalds的芬兰大学生&#xff0c;他在1983年开始编写一个用于个人电脑的操作系统内核。在他的努力下&#xff0c;Linux逐渐发展成为一个稳定而强大的操作系统。 然而&#…...

深度学习之半监督学习:一文梳理目标检测中的半监督学习策略

什么是半监督目标检测&#xff1f; 传统机器学习根据训练数据集中的标注情况&#xff0c;有着不同的场景&#xff0c;主要包括&#xff1a;监督学习、弱监督学习、弱半监督学习、半监督学习。由于目标检测任务的特殊性&#xff0c;在介绍半监督目标检测方法之前&#xff0c;我…...

Hive 高可用分布式部署详细步骤

目录 系统版本说明 hive安装包下载及解压 上传mysql-connector-java的jar包 配置环境变量 进入conf配置文件中&#xff0c;将文件重命名 在hadoop集群上创建文件夹 创建本地目录 修改hive-site.xml文件 同步到其他的节点服务器 修改node02中的配置 hive-site.xml 修改…...

Chapter03-Authentication vulnerabilities

文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

(二)原型模式

原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...

ETLCloud可能遇到的问题有哪些?常见坑位解析

数据集成平台ETLCloud&#xff0c;主要用于支持数据的抽取&#xff08;Extract&#xff09;、转换&#xff08;Transform&#xff09;和加载&#xff08;Load&#xff09;过程。提供了一个简洁直观的界面&#xff0c;以便用户可以在不同的数据源之间轻松地进行数据迁移和转换。…...

论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)

笔记整理&#xff1a;刘治强&#xff0c;浙江大学硕士生&#xff0c;研究方向为知识图谱表示学习&#xff0c;大语言模型 论文链接&#xff1a;http://arxiv.org/abs/2407.16127 发表会议&#xff1a;ISWC 2024 1. 动机 传统的知识图谱补全&#xff08;KGC&#xff09;模型通过…...

JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作

一、上下文切换 即使单核CPU也可以进行多线程执行代码&#xff0c;CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短&#xff0c;所以CPU会不断地切换线程执行&#xff0c;从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...

【HTTP三个基础问题】

面试官您好&#xff01;HTTP是超文本传输协议&#xff0c;是互联网上客户端和服务器之间传输超文本数据&#xff08;比如文字、图片、音频、视频等&#xff09;的核心协议&#xff0c;当前互联网应用最广泛的版本是HTTP1.1&#xff0c;它基于经典的C/S模型&#xff0c;也就是客…...

css3笔记 (1) 自用

outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size&#xff1a;0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格&#xff…...

USB Over IP专用硬件的5个特点

USB over IP技术通过将USB协议数据封装在标准TCP/IP网络数据包中&#xff0c;从根本上改变了USB连接。这允许客户端通过局域网或广域网远程访问和控制物理连接到服务器的USB设备&#xff08;如专用硬件设备&#xff09;&#xff0c;从而消除了直接物理连接的需要。USB over IP的…...

保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek

文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama&#xff08;有网络的电脑&#xff09;2.2.3 安装Ollama&#xff08;无网络的电脑&#xff09;2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...

深度学习水论文:mamba+图像增强

&#x1f9c0;当前视觉领域对高效长序列建模需求激增&#xff0c;对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模&#xff0c;以及动态计算优势&#xff0c;在图像质量提升和细节恢复方面有难以替代的作用。 &#x1f9c0;因此短时间内&#xff0c;就有不…...