当前位置: 首页 > news >正文

Redis之stream类型解读

目录

基本介绍

数据结构

消息

消费组

消费者

基本使用命令

概述 

xadd 命令

xtrim 命令

 xdel 命令

xlen 命令

xrange 命令

xread 命令 

xgroup 命令 

xreadgroup 命令

xack 命令


基本介绍

Redis stream(流)是一种数据结构,其作用类似于仅追加日志,但也实现了多个操作来克服典型仅追加日志的一些限制。其中包括O(1)时间的随机访问和复杂的消费策略,如消费者群体。 您可以使用流实时记录和同时联合事件。 

Redis 为每个stream(流)条目生成一个唯一的 ID。可以在以后使用这些 ID 检索其关联的条目,或读取和处理流中的所有后续条目。

Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。stream 有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的,Redis 重启后,内容还在。

数据结构

Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:

消息

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。XADD key ID field value[field value ...]

  • 消息 ID:消息 ID 的形式是 timestampInMillis-sequence,例如 1527846880572-5,它表示当前的消息在毫米时间戳 1527846880572 时产生,并且是该毫秒内产生的第5条消息。消息 ID 可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的 ID 要大于前面的消息 ID。
  • 消息内容:消息内容就是键值对,形如 hash 结构的键值对。

消费组

每个 Stream 都可以挂多个消费组(Consumer Group),每个消费组会有个游标last_delivered_id在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从 Stream 的某个消息ID开始消费,这个 ID 用来初始化 last_delivered_id 变量。每个消费组的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到。同一个消费组可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者有一个组内唯一名称。

消费者

消费者内部会有一个状态变量pending_ids,维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有ack(Acknowledge character:确认字符)。如果客户端没有 ack,这个变量里面的消息 ID 就会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称为 PEL,也就是 Pending Entries List,这是一个核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了而没被处理。

基本使用命令

概述 

消息队列相关命令:

  • XADD - 添加消息到末尾
  • XTRIM - 对流进行修剪,限制长度
  • XDEL - 删除消息
  • XLEN - 获取流包含的元素数量,即消息长度
  • XRANGE - 获取消息列表,会自动过滤已经删除的消息
  • XREVRANGE - 反向获取消息列表,ID 从大到小
  • XREAD - 以阻塞或非阻塞方式获取消息列表

消费者组相关命令:

  • XGROUP CREATE - 创建消费者组
  • XREADGROUP GROUP - 读取消费者组中的消息
  • XACK - 将消息标记为"已处理"
  • XGROUP SETID - 为消费者组设置新的最后递送消息ID
  • XGROUP DELCONSUMER - 删除消费者
  • XGROUP DESTROY - 删除消费者组
  • XPENDING - 显示待处理消息的相关信息
  • XCLAIM - 转移消息的归属权
  • XINFO - 查看流和消费者组的相关信息;
  • XINFO GROUPS - 打印消费者组的信息;
  • XINFO STREAM - 打印流信息

xadd 命令

XADD 命令将指定的流条目追加到指定 key 的流中。如果 key 不存在,将使用流的条目自动创建 key。

一个条目是由一组键值对组成的,它基本上是一个小的字典。键值对以用户给定的顺序存储,并且读取流的命令(如 XRANGE 或者 XREAD)可以保证按照通过 XADD 添加的顺序返回。

XADD key ID field value [field value ...]
  • key :队列名称,如果不存在就创建
  • ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
  • field value : 记录。
redis> XADD mystream * name Sara surname OConnor
"1601372323627-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) "1601372323627-0"2) 1) "name"2) "Sara"3) "surname"4) "OConnor"
2) 1) "1601372323627-1"2) 1) "field1"2) "value1"3) "field2"4) "value2"5) "field3"6) "value3"

 返回值:该命令返回添加的条目的 ID。如果 ID 参数传的是*,那么 ID 是自动生成的,否则,命令仅返回用户在插入期间指定的相同的 ID。

xtrim 命令

XTRIM 将流裁剪为指定数量的项目,如有需要,将驱逐旧的项目(ID较小的项目)。

XTRIM key MAXLEN [~] count
  • key :队列名称
  • MAXLEN :长度
  • count :数量
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1601372434568-0"2) 1) "field1"2) "A"3) "field2"4) "B"5) "field3"6) "C"7) "field4"8) "D"

返回值:返回从流中删除的条目数。

 xdel 命令

从指定流中移除指定的条目,并返回成功删除的条目的数量。在传递的ID不存在的情况下,返回的数量可能与传递的ID数量不同。

XDEL key ID[ID ...]
  • key:队列名称。
  • ID:消息 ID 
> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-02) 1) "a"2) "1"
2) 1) 1538561701744-02) 1) "c"2) "3"

返回值:删除的条目数量。

xlen 命令

返回流中的条目数。如果指定的key不存在,则此命令返回0,就好像该流为空。

与其他的Redis类型不同,零长度流是可能的,所以你应该调用TYPE或者EXISTS来检查一个key是否存在。一旦内部没有任何的条目(例如调用XDEL后),流不会被自动删除,因为可能还存在与其相关联的消费者组。

redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3

返回值:流中包含的条目数量

xrange 命令

xrange命令返回流中满足给定ID范围的条目。范围由最小和最大ID指定。所有ID在指定的两个ID之间或与其中一个ID相等(闭合区间)的条目将会被返回。

XRANGE key start end [COUNT count]
  • key :队列名
  • start :开始值, - 表示最小值
  • end :结束值, + 表示最大值
  • count :数量
redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) "1601372577811-0"2) 1) "name"2) "Virginia"3) "surname"4) "Woolf"
2) 1) "1601372577811-1"2) 1) "name"2) "Jane"3) "surname"4) "Austen"

返回值:该命令返回ID与指定范围匹配的条目。返回的条目是完整的,这意味着ID和所有组成条目的字段都将返回。此外,返回的条目及其字段和值的顺序与使用XADD添加它们的顺序完全一致。

xread 命令 

从一个或者多个流中读取数据,仅返回ID大于调用者报告的最后接收ID的条目。此命令有一个阻塞选项,用于等待可用的项目,类似于BRPOP或者BZPOPMIN等等。

XREAD[COUNT count][BLOCK milliseconds] STREAMS key[key ...]id[id ...]
  • count:数量。
  • milliseconds:可选,阻塞毫秒数,没有设置就是非阻塞模式。
  • key :队列名。
  • id:消息 ID。
redis> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"2) 1) 1) 1526984818136-02) 1) "duration"2) "1532"3) "event-id"4) "5"5) "user-id"6) "7782813"2) 1) 1526999352406-02) 1) "duration"2) "812"3) "event-id"4) "9"5) "user-id"6) "388234"
2) 1) "writers"2) 1) 1) 1526985676425-02) 1) "name"2) "Virginia"3) "surname"4) "Woolf"2) 1) 1526985685298-02) 1) "name"2) "Jane"3) "surname"4) "Austen"

返回值:该命令返回一个结果数组:返回数组的每个元素都是一个由两个元素组成的数组(键名和为该键报告的条目)。报告的条目是完整的流条目,具有ID以及所有字段和值的列表。返回的条目及其字段和值的顺序与使用XADD添加它们的顺序完全一致。

当使用BLOCK时,超时时将返回一个空回复(nil)。

xgroup 命令 

使用 XGROUP CREATE 创建消费者组,语法格式:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  • key :队列名称,如果不存在就创建
  • groupname :组名。
  • $ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

从头开始消费:

XGROUP CREATE mystream consumer-group-name 0-0  

从尾部开始消费:

XGROUP CREATE mystream consumer-group-name $

xreadgroup 命令

使用 XREADGROUP GROUP 读取消费组中的消息,语法格式:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group :消费组名
  • consumer :消费者名。
  • count : 读取数量。
  • milliseconds : 阻塞毫秒数。
  • key : 队列名。
  • ID : 消息 ID。

xack 命令

XACK命令用于从流的消费者组的待处理条目列表(简称PEL)中删除一条或多条消息。当一条消息交付到某个消费者时,它将被存储在PEL中等待处理,这通常出现在作为调用XREADGROUP命令的副作用,或者一个消费者通过调用XCLAIM命令接管消息的时候。

一旦消费者成功地处理完一条消息,它应该调用XACK,这样这个消息就不会被再次处理,且作为一个副作用,关于此消息的PEL条目也会被清除,从Redis服务器释放内存。

XACK key group ID[ID ...]

返回值:该命令返回成功确认的消息数。某些消息ID可能不再是PEL的一部分(例如因为它们已经被确认),而且XACK不会把他们算到成功确认的数量中。

相关文章:

Redis之stream类型解读

目录 基本介绍 数据结构 消息 消费组 消费者 基本使用命令 概述 xadd 命令 xtrim 命令 xdel 命令 xlen 命令 xrange 命令 xread 命令 xgroup 命令 xreadgroup 命令 xack 命令 基本介绍 Redis stream(流)是一种数据结构,其…...

C++ 网络编程项目fastDFS分布式文件系统(九)总结

1. Location语法 1. 语法规则 location [ |~|~ * |^~ ] /uri/ { … } 正则表达式中的特殊字符 : - . () {} [] * ? 2. Location 优先级说明 在 nginx 的 location 和配置中 location 的顺序没有太大关系。 与 location 表达式的类型有关。 相同类型的表达式&a…...

第五章 树与二叉树 一、树的定义与考点

一、定义 1.树是由n (n > 0) 个节点组成的有限集合。 2.当n0时,称为空树。 3.在非空树中,有且仅有一个节点没有前驱,其他节点都有且仅有一个前驱,称为根节点。 4.每个节点有零个或多个子节点,而每个子节点又有零…...

C语言基础之——指针(下)

前言:本篇文章将继续讲解有关指针的剩余基础知识。 学无止境,一起加油叭!! 目录 一.指针运算 1.指针 - 整数 2.指针的关系运算 3.指针 - 指针 二.指针与数组 三.二级指针 四.指针数组 总结 一.指针运算 指针运算包括以下三…...

小研究 - JVM 的类装载机制

本文通过对一个类装载实例的分析,阐明了 Java虚拟机的类装载的代理机制和由此定义的命名空间,指出了类装载机制在容器/组件/抽象框架结构中的作用。 目录 1 引言 2 实例 3 分析 3.1 类装载的代理机制 3.2 Java的命名空间 3.3 解决问题 4 应…...

项目---日志系统

目录 项目系统开发环境核心技术日志系统介绍为什么需要日志系统? 日志系统框架设计日志系统模块划分代码实现通用工具实现日志等级模块实现日志消息模块实现格式化模块实现落地模块实现日志器模块同步日志器异步日志器缓冲区实现异步工作器实现 回归异步日志器模块建造者模式日…...

设计模式--建造者模式(Builder Pattern)

一、什么是建造者模式 建造者模式(Builder Pattern)是一种创建型设计模式,它关注如何按照一定的步骤和规则创建复杂对象。建造者模式的主要目的是将一个复杂对象的构建过程与其表示分离,从而使同样的构建过程可以创建不同的表示。…...

若依vue打印的简单方法

像我们后端程序员做前端的话,有时候真不需要知道什么原理,直接塞就好了 我们选用基于hiprint 的vue-plugin-hiprint来打印 目的是为了实现点击某些行的数据,然后点击某个按钮直接弹出下面的打印 此链接 大佬是原创,我拿来总结梳理一下 插件进阶功能请移步: 链接 插件模板制作页…...

Rust 基础语法学习

Rust 基础语法学习 文章目录 Rust 基础语法学习hello world变量数据类型整数类型进制表示方法浮点数类型布尔类型字符类型字符串复合类型元组结构体元组结构体 切片类型字符串切片数组切片 不可变变量与可变变量常量注释函数语句与表达式 流程控制语句if else条件判断while循环…...

iOS开发Swift-函数

1.函数的定义和调用 func greet(person: String) -> String { // 函数名 传入值 传入值类型 返回值类型let greeting "Hello" personreturn greeting } print( greet(person: "Anna") ) //调用2.函数的参数与返回值 (1)无参函数 func sayHe…...

序列化协议:JSON和XML

作者:CARROT 链接:https://www.zhihu.com/question/604811576/answer/3100483698 来源:知乎 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 json和xml都是数据传输的格式。比如我们开发过程中需要和网…...

江西萍乡能源石油化工阀门三维扫描3d测量抄数建模-CASAIM中科广电

长期以来,石油天然气、石油石化、发电和管道输送行业在环保、健康和安全保障方面一直承受着巨大的压力,他们必须确保相关规程在各项作业中得到全面贯彻。 阀门作为流体管道运输中的组成部分,其装配密封度是保证流体运输安全的重要一环&#…...

Go【gin和gorm框架】实现紧急事件登记的接口

简单来说,就是接受前端微信小程序发来的数据保存到数据库,这是我写的第二个接口,相比前一个要稍微简单一些,而且因为前端页面也是我写的,参数类型自然是无缝对接_ 前端页面大概长这个样子 先用apifox模拟发送请求测试…...

第一个VUE程序?

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title></head> <body><div id"app">{{message}} </div><!-- 1.导入Vue.js --> <script s…...

电阻器件的分类

电阻器的种类碳膜电阻膜式电阻器中的一种。气态碳氢化合物在高温和真空中分解&#xff0c;碳沉积在瓷棒或者瓷管上&#xff0c;形成一层结晶碳膜。改变碳膜厚度和用刻槽的方式变更碳膜的长度可以得到不同的阻值。碳膜电阻成本较低&#xff0c;电性能和稳定性较差&#xff0c;一…...

QT基础教程之二 第一个Qt小程序

QT基础教程之二 第一个Qt小程序 按钮的创建 在Qt程序中&#xff0c;最常用的控件之一就是按钮了&#xff0c;首先我们来看下如何创建一个按钮 QPushButton * btn new QPushButton; 头文件 #include <QPushButton>//设置父亲btn->setParent(this);//设置文字btn-&g…...

Edge用户数据目录查找

创建 Microsoft Edge 用户数据目录变量...

最新外卖霸王餐小程序、H5、微信公众号版外卖系统源码|霸王餐美团/饿了么系统/外卖红包cps粉丝裂变玩法源码下载

最新外卖霸王餐小程序、H5、微信公众号版外卖系统源码、霸王餐美团、饿了么系统&#xff0c;粉丝裂变玩源码下载&#xff0c;外卖cps小程序项目&#xff0c;外卖红包cps带好友返利佣金分销系统程序、饿了么美团联盟源码&#xff0c;外卖cps带分销返利后端源码&#xff0c;基于L…...

数据库事务四大特性

事务的4大特性&#xff08;ACID&#xff09;&#xff1a; 原子性(Atomicity)&#xff1a; 事务是数据库的逻辑工作单位&#xff0c;它对数据库的修改要么全部执行&#xff0c;要么全部不执行。 一致性(Consistemcy)&#xff1a; 事务前后&#xff0c;数据库的状态都满足所有的完…...

浅谈Router和Route

router 和 route 是在前端框架中用于管理和处理路由的两个关键概念。这两者之间的关系可以通过具体的代码来解释。在本示例中&#xff0c;我将使用 React 和 React Router 来说明它们之间的关系。 Router&#xff08;路由器&#xff09;&#xff1a;Router 是一个库或框架&…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…...

Spring Boot面试题精选汇总

&#x1f91f;致敬读者 &#x1f7e9;感谢阅读&#x1f7e6;笑口常开&#x1f7ea;生日快乐⬛早点睡觉 &#x1f4d8;博主相关 &#x1f7e7;博主信息&#x1f7e8;博客首页&#x1f7eb;专栏推荐&#x1f7e5;活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...

相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)

【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...

汇编常见指令

汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX&#xff08;不访问内存&#xff09;XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...

pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)

目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关&#xff0…...

tree 树组件大数据卡顿问题优化

问题背景 项目中有用到树组件用来做文件目录&#xff0c;但是由于这个树组件的节点越来越多&#xff0c;导致页面在滚动这个树组件的时候浏览器就很容易卡死。这种问题基本上都是因为dom节点太多&#xff0c;导致的浏览器卡顿&#xff0c;这里很明显就需要用到虚拟列表的技术&…...

Kafka入门-生产者

生产者 生产者发送流程&#xff1a; 延迟时间为0ms时&#xff0c;也就意味着每当有数据就会直接发送 异步发送API 异步发送和同步发送的不同在于&#xff1a;异步发送不需要等待结果&#xff0c;同步发送必须等待结果才能进行下一步发送。 普通异步发送 首先导入所需的k…...

JS手写代码篇----使用Promise封装AJAX请求

15、使用Promise封装AJAX请求 promise就有reject和resolve了&#xff0c;就不必写成功和失败的回调函数了 const BASEURL ./手写ajax/test.jsonfunction promiseAjax() {return new Promise((resolve, reject) > {const xhr new XMLHttpRequest();xhr.open("get&quo…...

go 里面的指针

指针 在 Go 中&#xff0c;指针&#xff08;pointer&#xff09;是一个变量的内存地址&#xff0c;就像 C 语言那样&#xff1a; a : 10 p : &a // p 是一个指向 a 的指针 fmt.Println(*p) // 输出 10&#xff0c;通过指针解引用• &a 表示获取变量 a 的地址 p 表示…...

MFE(微前端) Module Federation:Webpack.config.js文件中每个属性的含义解释

以Module Federation 插件详为例&#xff0c;Webpack.config.js它可能的配置和含义如下&#xff1a; 前言 Module Federation 的Webpack.config.js核心配置包括&#xff1a; name filename&#xff08;定义应用标识&#xff09; remotes&#xff08;引用远程模块&#xff0…...