当前位置: 首页 > article >正文

redis高级数据结构Stream

文章目录

  • 背景
  • stream概述
    • 消息 ID
    • 消息内容
    • 常见操作
    • 独立消费
    • 创建消费组
    • 消费
  • Stream弊端
    • Stream 消息太多怎么办?
    • 消息如果忘记 ACK 会怎样?
    • PEL 如何避免消息丢失?
    • 分区 Partition
  • Stream 的高可用
  • 总结

背景

为了解决list作为消息队列是无法支持消息多播问题,Redis5.0 多出了一个数据结构 Stream,它是一个新的强大的支持多播的可持久化的消息队列,作者坦言 Redis Stream 狠狠地借鉴了 Kafka 的设计。

stream概述

在这里插入图片描述

Redis Stream 的结构如上图所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的, Redis 重启后,内容还在。

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

每个 Stream 都可以挂多个消费组,每个消费组会有个游标 last_delivered_id 在 Stream数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream内唯一的名称,消费组不会自动创建,它需要单独的指令 xgroup create 进行创建,需要指定从 Stream 的某个消息 ID 开始消费,这个 ID 用来初始化 last_delivered_id 变量。

每个消费组 (Consumer Group) 的状态都是独立的,相互不受影响。也就是说同一份Stream 内部的消息会被每个消费组都消费到。

同一个消费组 (Consumer Group) 可以挂接多个消费者 (Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者有一个组内唯一名称。

消费者 (Consumer) 内部会有个状态变量 pending_ids,它记录了当前已经被客户端读取的消息,但是还没有 ack。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL,也就是 Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。

消息 ID

消息 ID 的形式是 timestampInMillis-sequence,例如 1527846880572-5,它表示当前的消息在毫米时间戳 1527846880572 时产生,并且是该毫秒内产生的第 5 条消息。消息 ID 可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的 ID 要大于前面的消息 ID。

消息内容

消息内容就是键值对,形如 hash 结构的键值对,这没什么特别之处。

常见操作

  • 1、 xadd 追加消息
  • 2、 xdel 删除消息,这里的删除仅仅是设置了标志位,不影响消息总长度
  • 3、 xrange 获取消息列表,会自动过滤已经删除的消息
  • 4、 xlen 消息长度
  • 5、 del 删除 Stream

独立消费

我们可以在不定义消费组的情况下进行 Stream 消息的独立消费,当 Stream 没有新消
息时,甚至可以阻塞等待。 Redis 设计了一个单独的消费指令 xread,可以将 Stream 当成普通的消息队列 (list) 来使用。使用 xread 时,我们可以完全忽略消费组 (Consumer Group)的存在,就好比 Stream 就是一个普通的列表 (list)。

客户端如果想要使用 xread 进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息 ID。下次继续调用 xread 时,将上次返回的最后一个消息 ID 作为参数传递进去,就可以继续消费后续的消息。block 0 表示永远阻塞,直到消息到来, block 1000 表示阻塞 1s,如果 1s 内没有任何消息到来,就返回 nil。类似kafka广播消费,客户端保存offset。

创建消费组

在这里插入图片描述
Stream 通过 xgroup create 指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量。其实就是kafka广播消费下客户端自己保存offset,消费者在消费前传递offset告知kafka从哪开始消费,果然是借鉴kafka,但是没做到kafka的层度(kafka客户端会自己保存在内存中,不需要使用者自己保存)。

消费

Stream 提供了 xreadgroup 指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息 ID。它同 xread 一样,也可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的 PEL(正在处理的消息) 结构里,客户端处理完毕后使用 xack指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除。

Stream弊端

Stream 消息太多怎么办?

读者很容易想到,要是消息积累太多, Stream 的链表岂不是很长,内容会不会爆掉?xdel指令又不会删除消息,它只是给消息做了个标志位。

Redis 自然考虑到了这一点,所以它提供了一个定长 Stream 功能。在 xadd 的指令提供一个定长长度 maxlen,就可以将老的消息干掉,确保最多不超过指定长度。

这个类似kafka的日志删除,只是这里固定为根据大小删除,当达到一定量就删除旧数据。

消息如果忘记 ACK 会怎样?

Stream 在每个消费者结构中保存了正在处理中的消息 ID 列表 PEL,如果消费者收到
了消息处理完了但是没有回复 ack,就会导致 PEL 列表不断增长,如果有很多消费组的话,那么这个 PEL 占用的内存就会放大。

PEL 如何避免消息丢失?

在客户端消费者读取 Stream 消息时, Redis 服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了。但是 PEL 里已经保存了发出去的消息 ID。待客户端重新连上之后,可以再次收到 PEL 中的消息 ID 列表。不过此时 xreadgroup 的起始消息ID 不能比PEL内的大,而必须是任意有效的消息 ID,一般将参数设为 0-0,表示读取所有的PEL 消息以及自 last_delivered_id 之后的新消息。这其实就像kafka提供seek方法用于客户端指定从哪个offset开始消费。

分区 Partition

Redis 的服务器没有原生支持分区能力,如果想要使用分区,那就需要分配多个Stream,然后在客户端使用一定的策略来生产消息到不同的 Stream。所以个人认为还是kafka强大,stream只是个对kafka进行大量阉割的消息队列,使用上请谨慎。

Stream 的高可用

Stream 的高可用是建立主从复制基础上的,它和其它数据结构的复制机制没有区别,也就是说在 Sentinel 和 Cluster 集群环境下 Stream 是可以支持高可用的。不过鉴于 Redis 的指令复制是异步的,在 failover 发生时, Redis 可能会丢失极小部分数据,这点 Redis 的其它数据结构也是一样的。

总结

如果真需要使用消息队列那么还是选择市面上大家认可的消息队列,以防在后期迭代时发现大量功能问题和性能问题。

相关文章:

redis高级数据结构Stream

文章目录 背景stream概述消息 ID消息内容常见操作独立消费创建消费组消费 Stream弊端Stream 消息太多怎么办?消息如果忘记 ACK 会怎样?PEL 如何避免消息丢失?分区 Partition Stream 的高可用总结 背景 为了解决list作为消息队列是无法支持消息多播问题,Redis5.0…...

day44 QT核心机制

头文件&#xff1a; #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include<QLabel> //标签类头文件 #include<QPushButton> //按钮类头文件 #include<QLineEdit> //行编辑器类头文件QT_BEGIN_NAMESPACE namespace Ui { class Widget; } …...

打家劫舍3

今天和打家讲一下打家劫舍3 题目&#xff1a; 题目链接&#xff1a;337. 打家劫舍 III - 力扣&#xff08;LeetCode&#xff09; 小偷又发现了一个新的可行窃的地区。这个地区只有一个入口&#xff0c;我们称之为root。 除了 root 之外&#xff0c;每栋房子有且只有一个“父“…...

webpack配置之---上下文

context context 是 Webpack 配置中的一个重要属性&#xff0c;它主要用于确定模块解析时的基础目录。可以理解为是 Webpack 在解析模块时&#xff0c;基于哪个目录作为根路径来查找模块。context 的默认值是 process.cwd()&#xff0c;即当前执行 Webpack 命令时的工作目录。…...

Spring Boot: 使用 @Transactional 和 TransactionSynchronization 在事务提交后发送消息到 MQ

Spring Boot: 使用 Transactional 和 TransactionSynchronization 在事务提交后发送消息到 MQ 在微服务架构中&#xff0c;确保消息的可靠性和一致性非常重要&#xff0c;尤其是在涉及到分布式事务的场景中。本文将演示如何使用 Spring Boot 的事务机制和 TransactionSynchron…...

2024中国行政区划多边形矢量数据(含有十段线)仅供学习

中国标准行政区划数据GS&#xff08;2024&#xff09;0650号&#xff0c;包括&#xff1a; 分省市县 省内分市 省内分县 南海十段线与岛屿区域 全国市级行政区划 通过网盘分享的文件&#xff1a;中国标准行政区划数据GS&#xff08;2024&#xff09;0650号.rar等4个文件 链接…...

给底部导航栏添加图形

文章目录 1. 概念介绍2. 修改方法2.1 修改属性2.2 包裹容器2.3 剪裁形状3. 代码与效果3.1 示例代码3.2 运行效果4. 内容总结我们在上一章回中介绍了"NavigationBar组件"相关的内容,本章回中将介绍如何修改NavigationBar组件的形状.闲话休提,让我们一起Talk Flutter…...

DeepSeek-R1 智能知识库系统使用指南

DeepSeek-R1 智能知识库系统使用指南 第一部分 基础操作教程 1.1 系统初始化 // 示例命令 > /initialize --configenterprise_knowledge --languagezh-CN [系统响应] 已加载企业知识图谱&#xff08;含12万实体/35万关系&#xff09;NLP引擎切换为中文混合语义模型1.2 基…...

#渗透测试#批量漏洞挖掘#WookTeam searchinfo SQL注入漏洞

免责声明 本教程仅为合法的教学目的而准备,严禁用于任何形式的违法犯罪活动及其他商业行为,在使用本教程前,您应确保该行为符合当地的法律法规,继续阅读即表示您需自行承担所有操作的后果,如有异议,请立即停止本文章读。 目录 一、漏洞概述 二、漏洞成因分析 1. 代码…...

leetcode 做题思路快查

128. 最长连续序列 arr 1 2 3 4 100 200; A. for将元素加入hash_set; B.对于每个x, x-1不在hash_set则x是bengin节点&#xff0c;begin_vev 1 , 100 , 200; C. 对于bengin_vec中&#xff0c;如果x在hash_set&#xff0c;则序列长度 151. 反转字符串中的单词151. 反转字符串…...

HarmonyOS Next 方舟字节码文件格式介绍

在开发中&#xff0c;可读的编程语言要编译成二进制的字节码格式才能被机器识别。在HarmonyOS Next开发中&#xff0c;arkts会编译成方舟字节码。方舟字节码长什么样呢&#xff1f;我们以一个demo编译出的abc文件&#xff1a; 二进制就是长这样&#xff0c;怎么去理解呢&…...

iOS主要知识点梳理回顾-2-多线程

iOS的多线程主要有三种方式&#xff0c;NSThread、GCD&#xff08;Grand Central Dispatch&#xff09;NSOperationQueue 开始&#xff0c;在iOS2发布的时候&#xff0c;苹果同步推出了NSthread和NSOperation。其中NSthread比较简单&#xff0c;仅提供了创建队列、开始、取消、…...

WPS如何接入DeepSeek(通过JS宏调用)

WPS如何接入DeepSeek 一、文本扩写二、校对三、翻译 本文介绍如何通过 WPS JS宏调用 DeepSeek 大模型&#xff0c;实现自动化文本扩写、校对和翻译等功能。 一、文本扩写 1、随便打开一个word文档&#xff0c;点击工具栏“工具”。 2、点击“开发工具”。 3、点击“查看代码”…...

【课程设计参考】迷宫小游戏 :基于 Python+Pygame+AI算法

一、内容 实现走迷宫 &#xff08;1&#xff09;游戏界面显示&#xff1a;迷宫地图、上下左右移动的特效。 &#xff08;2&#xff09;动作选择&#xff1a;上下左右键对应于上下左右的移动功能&#xff0c;遇到障碍的处理。 &#xff08;3&#xff09;得分统计功能&#xff…...

sa8295 qnx ais_camare如何支持一个摄像头两路vc输出?

当一个摄像头有两个vc输出的时候&#xff0c;如何更改驱动配置呢&#xff1f; 当一个摄像头可以输出两路vc&#xff0c;并且格式不同。根据每一路的vc图像数据格式修改串行器中maxxxx_mode_t里面的数组mode参数&#xff08;以下仅为例子&#xff09; struct maxxxx_mode_t ma…...

通过类加载和初始化的一些题目理解Java类加载过程

通过题目重点理解&#xff1a;Class加载流程和运行时区域 目录 子类和父类static变量父子类加载顺序2class.forName初始化 子类和父类static变量 class Parent {static int a 1;static int b 2;static int c;static {c 3;System.out.println("parent static block&quo…...

Coze(扣子)+ Deepseek:多Agents智能体协作开发新范式

前言 在当今数字化浪潮中&#xff0c;人工智能&#xff08;AI&#xff09;技术的迅猛发展正深刻改变着我们的生活和工作方式。从智能语音助手到自动化流程机器人&#xff0c;AI 的应用无处不在&#xff0c;为我们提供了更加便捷、高效的服务。然而&#xff0c;对于非专业人士来…...

浅析Ruby类污染及其在Sinatra框架下的利用

和JavaScript中的原型链污染类似&#xff0c;Ruby中也存在类似的概念——类污染&#xff0c;两者都是对象进行不安全的递归合并导致的。 网上也没有相关的分析文章&#xff0c;只有下面这篇文章应该是第一次谈到这个问题 Class Pollution in Ruby: A Deep Dive into Exploiti…...

【NLP251】Transformer API调用

1. nn.Transformer nn.Transformer封装了Transformer中的包含编码器&#xff08;Encoder&#xff09;和解码器&#xff08;Decoder&#xff09;。如下图所示&#xff0c;它对Encoder和Decoder两部分的包装&#xff0c;它并没有实现输入中的Embedding和Positional Encoding和最…...

ubuntu下迁移docker文件夹

在 Ubuntu 系统中迁移 Docker 文件夹&#xff08;如 Docker 数据存储文件夹 /var/lib/docker&#xff09;到另一个磁盘或目录&#xff0c;通常是为了释放系统盘空间。以下是迁移过程的详细步骤&#xff1a; 1. 停止 Docker 服务 在进行迁移之前&#xff0c;必须停止 Docker 服…...

为AI聊天工具添加一个知识系统 之93 详细设计之34 Derivation 之 8 实现和平台

本文要点 要点 插入话题&#xff1a;实现 “实现”作为一个普通名词&#xff08;一般术语&#xff09;应该遵循第一性第二性第三性原则。其 第一性第二性第三性 分别是&#xff1a;完整性/鲁棒性/健壮性 &#xff0c;三者 分别注重 性能/功能/能力。即 首先是 实现完整性的性…...

idea 如何使用deepseek 保姆级教程

1.安装idea插件codegpt 2.注册deepseek并生成apikey deepseek 开发平台&#xff1a; DeepSeek​​​​​​​ 3.在idea进行codegpt配置 打开idea的File->Settings->Tools->CodeGPT->Providers->Custom OpenAI Chat Completions的URL填写 https://api.deepseek…...

python实现情绪识别模块,并将模块封装成可执行文件

目录&#xff1a; 1.源码&#xff1a;2.情绪识别模型运行流程&#xff1a;3.模型封装需要注意的地方&#xff1a;4.未解决问题&#xff1a; 1.源码&#xff1a; https://gitcode.com/xyint/deep_learning.git 2.情绪识别模型运行流程&#xff1a; 需要获取用户摄像头权限&…...

AH比价格策略源代码

用python 获取在A股和香港上市的公司和在A股和香港上市的公司股票代码和名称并且选出港股和A股涨幅相差比较大的股票 import akshare as akdef get_ah_stocks():# 获取A股股票列表a_stock_list ak.stock_zh_a_spot_em()print(a_stock_list)a_stock_list a_stock_list[[&quo…...

trimesh 加载obj mesh处理

目录 trimesh 加载obj trimesh入门 主要功能 安装 基本用法 1. 加载和保存 3D 模型 2. 几何操作 3. 网格分析 4. 可视化 5. 布尔运算 6. 碰撞检测 trimesh 加载obj template_mesh trimesh.load_mesh(r"E:\project\3d\lilpotat--pytorch3d\pixie_data\smplx_te…...

常见数据结构的C语言定义---《数据结构C语言版》

文章目录 1. 静态分配的顺序表2. 动态分配的顺序表3. 单 链 表4. 双 链 表5. 静态链表6. 顺序栈7. 链栈8. 顺序存储的队列9. 链式存储的队列10. 链式存储的二叉树11. 线索二叉树12. 树的双亲表示法13. 树的孩子兄弟表示法12. 图的邻接矩阵法13. 图的邻接表法1-13集合版本 #defi…...

C++小知识记录,不定时更新

1. 普通函数不能在头文件中定义&#xff1a; 当多个.cpp调用时&#xff0c;在编译链接时会在.o文件中重复定义报错 2. 为什么内联函数可以在头文件中定义&#xff1a;适用短小函数 当.cpp调用时&#xff0c;编译器只会在当前文件展开该函数&#xff0c;相当于每个.cpp会重新定…...

python--sqlite

1. 连接到数据库 使用 sqlite3.connect() 方法可以创建一个到SQLite数据库的连接。如果指定的数据库文件不存在&#xff0c;它会自动创建一个新的数据库文件。 import sqlite3# 连接到数据库&#xff0c;如果数据库文件不存在则会创建一个新的 conn sqlite3.connect(example…...

使用 Axios ——个人信息修改与提示框实现

目录 详细介绍&#xff1a;个人信息设置与修改页面实现 1. HTML 结构 2. CSS 样式 3. JavaScript 核心逻辑 a. 信息渲染与表单提交 b. 头像上传与预览 4. 功能详解 5. 总结 提示&#xff1a; 这段代码展示了如何创建一个简单的个人信息设置页面&#xff0c;包含用户个…...

群晖安装Gitea

安装Docker Docker运行Gitea 上传gitea包&#xff0c;下载地址&#xff1a;https://download.csdn.net/download/hmxm6/90360455 打开docker 点击印象&#xff0c;点击新增&#xff0c;从文件添加 点击启动 可根据情况&#xff0c;进行高级设置&#xff0c;没有就下一步 点击应…...