当前位置: 首页 > news >正文

Redis 数据类型Streams

目录

1 基本特性

2 主要操作命令 

2.1 XADD key ID field value [field value ...]

2.2 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

2.3 XRANGE key start end [COUNT count]

2.4 XREVRANGE key end start [COUNT count]

2.5 XGROUP CREATE key groupname id-or-$ [MKSTREAM]

2.6 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

2.7 XACK key group ID [ID ...]

2.8 XPENDING key group [start end count] [IDLE idle]

2.9 XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE idle] [TIME time] [RETries count] [FORCE]

2.10 XINFO 

2.11  XDEL key ID [ID ...]

2.12 XTRIM key MAXLEN [~] len

3 使用场景


Redis Streams 是 Redis 5.0 引入的一种新的数据类型,它提供了一种强大的日志结构化数据存储方式。Streams 类型非常适合用于构建消息队列、事件日志以及其他需要持久化和高效处理时间序列数据的应用场景。

1 基本特性

  • 持久性:与传统的发布/订阅不同,Streams 中的消息是持久化的,即使客户端断开连接后重新连接,仍然可以访问到之前的消息。

  • 多消费者支持:支持多个消费者组(consumer groups),每个组可以独立地消费流中的消息。消费者组允许不同的消费者处理相同的消息,但每个消息在一个组内只能被一个消费者处理一次。

  • 消息 ID 和范围查询:每条消息都有一个唯一的 ID,由时间戳和序列号组成。可以通过指定消息 ID 范围来获取特定时间段内的消息。

  • 阻塞读取:支持阻塞读取(XREAD 和 XREADGROUP 命令的 BLOCK 选项),使得客户端可以在没有新消息时等待一段时间。

  • 自动删除:可以设置最大长度(MAXLEN 选项)来限制流的大小,超过长度的消息会自动被删除。

  • 灵活的消息格式:每条消息可以包含多个字段-值对,类似于哈希表,这使得消息可以携带丰富的信息。

2 主要操作命令 

2.1 XADD key ID field value [field value ...]

向指定的流中添加一条新消息,ID 可以是 *(表示自动生成)或指定的时间戳和序列号。

127.0.0.1:6379> xadd mystream * sensor_id 123 temmperature 22.5
"1729306027171-0"

返回的结构可以分为两部分:

  • 时间戳1729306027171 (表示条目被添加的时间,单位是毫秒。你可以将这个时间戳转换为可读的日期和时间格式。
  • 序列号0 (表示在同一毫秒内这是第一个条目。如果在同一毫秒内添加了多个条目,序列号将会递增,例如 1729306027171-11729306027171-2 等。
2.2 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
  • 从一个或多个 Stream 中读取数据。
  • COUNT 指定返回的最大条目数。
  • BLOCK 指定在没有新消息时阻塞的时间(毫秒)。
  • STREAMS 指定要读取的 Stream 和起始 ID。
127.0.0.1:6379> xread count 2 streams mystream 0-0
1) 1) "mystream"2) 1) 1) "1729306027171-0"2) 1) "sensor_id"2) "123"3) "temmperature"4) "22.5"
2.3 XRANGE key start end [COUNT count]
  • 返回指定 ID 范围内的条目。
  • start 和 end 是 ID,可以使用 - 表示最小 ID,+ 表示最大 ID。
127.0.0.1:6379> xrange mystream - +
1) 1) "1729306027171-0"2) 1) "sensor_id"2) "123"3) "temmperature"4) "22.5"
2.4 XREVRANGE key end start [COUNT count]

返回指定 ID 范围内的条目,但按逆序排列。

127.0.0.1:6379> xadd mystream * sensor_id 234 temmperature 23.5
"1729329067777-0"
127.0.0.1:6379> xadd mystream * sensor_id 345
"1729329079135-0"
127.0.0.1:6379> xrevrange mystream + - count 2
1) 1) "1729329079135-0"2) 1) "sensor_id"2) "345"
2) 1) "1729329067777-0"2) 1) "sensor_id"2) "234"3) "temmperature"4) "23.5"
2.5 XGROUP CREATE key groupname id-or-$ [MKSTREAM]
  • 创建一个新的消费者组。
  • id-or-$ 是起始位置,可以是具体的 ID 或 $ 表示只消费新的条目。
  • MKSTREAM 如果 Stream 不存在则创建它。
127.0.0.1:6379> xgroup create mystream mygroup 0
OK
2.6 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • 从消费者组中读取数据。
  • GROUP:指定消费者组的名称。
  • consumer:指定消费者的名称。
  • COUNT count:可选参数,指定一次最多读取的消息数量。
  • BLOCK milliseconds:可选参数,如果当前没有可用的消息,命令将阻塞指定的时间(以毫秒为单位),等待新消息的到来。
  • NOACK: 表示不确认消息,通常用于快速消费。
  • STREAMS:指定要读取的流及其对应的 ID。ID 通常是一个特殊值 >,表示只读取新的消息;也可以是具体的 ID,表示从该 ID 开始读取。
127.0.0.1:6379> xreadgroup group mygroup consumer1 count 2 streams mystream >
1) 1) "mystream"2) 1) 1) "1729306027171-0"2) 1) "sensor_id"2) "123"3) "temmperature"4) "22.5"2) 1) "1729329067777-0"2) 1) "sensor_id"2) "234"3) "temmperature"4) "23.5"
2.7 XACK key group ID [ID ...]

确认已处理的消息。XACK 命令用于确认消费者组中的消息已经被成功处理。当你使用 XACK 命令时,Redis 会将指定的消息从“待处理”状态转换为“已确认”状态,并从消费者的待处理列表中移除。

127.0.0.1:6379> xack mystream mygroup 1729329067777-0
(integer) 1
127.0.0.1:6379> xack mystream mygroup 1729329079135-0
(integer) 0

当 XACK 命令成功确认一条消息时,返回值为 1,表示该消息已经被确认并且从待处理列表中移除。例如,如果消息 1729329067777-0 是由 consumer1 处理的,并且现在调用 XACK 确认它,那么这条消息将不再出现在 consumer1 的待处理列表中。

2.8 XPENDING key group [start end count] [IDLE idle]

查看待处理的消息。

127.0.0.1:6379> xpending mystream mygroup
1) (integer) 1
2) "1729306027171-0"
3) "1729306027171-0"
4) 1) 1) "consumer1"2) "1"
127.0.0.1:6379> xack mystream mygroup 1729306027171-0
(integer) 1
127.0.0.1:6379> xpending mystream mygroup
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)
2.9 XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE idle] [TIME time] [RETries count] [FORCE]

用于将一个或多个消息从一个消费者转移到另一个消费者。这个命令通常用于处理消息超时或重新分配消息的情况。XCLAIM 允许你手动将消息从一个消费者的待处理列表移动到另一个消费者的待处理列表。

127.0.0.1:6379> xreadgroup group mygroup consumer1 count 2 streams mystream >
1) 1) "mystream"2) 1) 1) "1729329079135-0"2) 1) "sensor_id"2) "345"
127.0.0.1:6379> xclaim mystream mygroup consumer2 10000 1729329079135-0
1) 1) "1729329079135-0"2) 1) "sensor_id"2) "345"
  • mystream:流的名称。
  • mygroup:消费者组的名称。
  • consumer2:目标消费者的名称,即消息将被转移给这个消费者。
  • 10000:消息的空闲时间(以毫秒为单位)。只有那些空闲时间超过这个值的消息才会被转移。
  • 1729329079135-0:要转移的消息 ID。
2.10 XINFO 

获取 Stream 或消费者组的信息。

127.0.0.1:6379> xinfo stream mystream1) "length"2) (integer) 33) "radix-tree-keys"4) (integer) 15) "radix-tree-nodes"6) (integer) 27) "groups"8) (integer) 19) "last-generated-id"
10) "1729329079135-0"
11) "first-entry"
12) 1) "1729306027171-0"2) 1) "sensor_id"2) "123"3) "temmperature"4) "22.5"
13) "last-entry"
14) 1) "1729329079135-0"2) 1) "sensor_id"2) "345"
127.0.0.1:6379> xinfo groups mystream
1) 1) "name"2) "mygroup"3) "consumers"4) (integer) 25) "pending"6) (integer) 17) "last-delivered-id"8) "1729329079135-0"
127.0.0.1:6379> xinfo consumers mystream mygroup
1) 1) "name"2) "consumer1"3) "pending"4) (integer) 05) "idle"6) (integer) 255317
2) 1) "name"2) "consumer2"3) "pending"4) (integer) 15) "idle"6) (integer) 191940

XINFO STREAM mystream

  • length:

    • 流中的消息总数:3 条。
  • radix-tree-keys:

    • 用于存储流数据的 radix tree 中的键的数量:1 个。
  • radix-tree-nodes:

    • 用于存储流数据的 radix tree 中的节点数量:2 个。
  • groups:

    • 与该流关联的消费者组数量:1 个。
  • last-generated-id:

    • 流中最后生成的消息 ID:1729329079135-0
  • first-entry:

    • 流中的第一条消息:
      • 消息 ID: 1729306027171-0
      • 消息内容:
        • sensor_id123
        • temmperature22.5
  • last-entry:

    • 流中的最后一条消息:
      • 消息 ID: 1729329079135-0
      • 消息内容:
        • sensor_id345

XINFO GROUPS mystream

  • name:

    • 消费者组的名称:mygroup
  • consumers:

    • 该组中的消费者数量:2 个。
  • pending:

    • 该组中待处理的消息数量:1 条。
  • last-delivered-id:

    • 该组中最后一个被交付的消息 ID:1729329079135-0

XINFO CONSUMERS mystream mygroup

  • 第一个消费者:

    • nameconsumer1
    • pending: 待处理的消息数量:0 条
    • idle: 空闲时间(以毫秒为单位):255,317 毫秒(约 4 分钟 15 秒)
  • 第二个消费者:

    • nameconsumer2
    • pending: 待处理的消息数量:1 条
    • idle: 空闲时间(以毫秒为单位):191,940 毫秒(约 3 分钟 12 秒)
2.11  XDEL key ID [ID ...]

从 Stream 中删除一个或多个条目。

127.0.0.1:6379> xdel mystream 1729306027171-0
(integer) 1
127.0.0.1:6379> xrange mystrea - +
(empty list or set)
127.0.0.1:6379> xrange mystream - +
1) 1) "1729329067777-0"2) 1) "sensor_id"2) "234"3) "temmperature"4) "23.5"
2) 1) "1729329079135-0"2) 1) "sensor_id"2) "345"
2.12 XTRIM key MAXLEN [~] len

修剪 Stream,保留最多 len 个条目,~ 表示近似长度。

127.0.0.1:6379> xrange mystream - +
1) 1) "1729329067777-0"2) 1) "sensor_id"2) "234"3) "temmperature"4) "23.5"
2) 1) "1729329079135-0"2) 1) "sensor_id"2) "345"
127.0.0.1:6379> xtrim mystream maxlen 1
(integer) 1
127.0.0.1:6379> xrange mystream - +
1) 1) "1729329079135-0"2) 1) "sensor_id"2) "345"

3 使用场景

  • 日志记录:可以用来存储系统的日志信息,方便后续分析和处理。
  • 事件流:处理实时事件,如传感器数据、用户行为等。
  • 消息队列:实现可靠的消息传递系统,支持多个消费者组。
  • 任务队列:管理后台任务,确保任务被正确处理。

更多命令请参考:Commands | Docs 

相关文章:

Redis 数据类型Streams

目录 1 基本特性 2 主要操作命令 2.1 XADD key ID field value [field value ...] 2.2 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] 2.3 XRANGE key start end [COUNT count] 2.4 XREVRANGE key end start [COUNT count] 2.5 XGROUP …...

基智科技CEO张文战:探索火山引擎数据飞轮模式下的大模型应用新机会

9月下旬,火山引擎数据飞轮研讨会在北京举办,北京基智科技有限公司(以下简称“基智科技”)CEO张文战作为积极探索大模型应用领域的企业代表,围绕“数据飞轮如何转进企业业务流”展开主题分享,并介绍基智科技…...

【AUTOSAR标准文档】AotuSar结构横向分层详解(RTE、BSW)

Top view The AUTOSAR Architecture distinguishes on the highest abstraction level between three software layers: Application, Runtime Environment and Basic Software which run on a Microcontroller. 译文:AUTOSAR架构在最高抽象层次上将软件分为三层&…...

新 Chrome 插件可检测 AI 伪造声音;Canary Speech 推出用于临床对话的语音分析技术丨 RTE 开发者日报

开发者朋友们大家好: 这里是 「RTE 开发者日报」 ,每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE(Real-Time Engagement) 领域内「有话题的 新闻 」、「有态度的 观点 」、「有意思的 数据 」、「有思考的 文…...

1. 路由定义

1. 通过配置文件形式 配置方式与laravel的配置方式相似 <?php use Hyperf\HttpServer\Router\Router;Router::get(/hello-hyperf, function () {return Hello Hyperf.; });// 设置一个 GET 请求的路由&#xff0c;绑定访问地址 /get 到 App\Controller\IndexController 的 …...

我们可以用微服务创建状态机吗?

大家好&#xff0c;我是锋哥。今天分享关于【我们可以用微服务创建状态机吗&#xff1f;】面试题&#xff1f;希望对大家有帮助&#xff1b; 我们可以用微服务创建状态机吗&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 是的&#xff0c;微服务架构可…...

邦芒贴士:职场新人需远离的7种坏习惯

咱们每一个人都会有这样那样的毛病&#xff0c;而试用期就是试毛病的大小。对于职场新人来说&#xff0c;第一份工作很容易暴露这样那样的职业毛病。职业习惯直接决定了我们以后的职业发展&#xff0c;职业能力。对于职场新人来说&#xff0c;在试用期内&#xff0c;一些职场坏…...

面向医院的统一支付平台产品经验分享

我们面向医院的统一支付平台其实应该属于四方平台的范畴,依托于微信、支付宝等第三方支付平台和银联、银行等渠道生存。 二、医院常见系统说明: 先普及一下医院的系统情况: HIS(医院信息系统Hospital Information System):医院内的核心系统,为医院所属各部门提供病人诊…...

http作业

配置nginx服务通过ip访问多网站 1、前提配置 2、安装nginx服务 3、配置多IP 在linux主机上查看ip地址 4、定义nginx文件 5、在主机创建文件&#xff0c;重启nginx服务 6、测试...

AlDente Pro for Mac电脑 充电限制保护工具 安装教程【简单,轻松上手】

Mac分享吧 文章目录 AlDente Pro for Mac 充电限制保护工具 安装完成&#xff0c;软件打开效果一、AlDente Pro for Mac 充电限制保护工具 Mac电脑版——v1.28.41️⃣&#xff1a;下载软件2️⃣&#xff1a;安装软件&#xff0c;将安装包从左侧拖入右侧文件夹中&#xff0c;等…...

C语言数据结构之算法复杂度

目录 一、数据结构是什么 二、算法是什么 三、算法的效率 3.1 复杂度的概念 四、时间复杂度 4.1 大O渐进表示法 4.2 算法题分析 五、空间复杂度 5.1 复杂度对比 5.2 算法题题分析 正文开始 一、数据结构是什么 每个计算机专业的同学在大学都会接触到一门计算机必修课《数…...

HDU RSA

翻译成中文后&#xff1a; 思路&#xff1a;由题易得&#xff0c;d * e y * f ( n ) 1 ,且gcd ( e , f ( n ) ) 1,所以用扩展欧几里得求出 d &#xff0c;但要保证 d 是非负的&#xff0c;最有用快速幂求出每个字符即可。 #include<bits/stdc.h> using namespace std;…...

数据仓库建设 : 主题域简介

在数据仓库建设中&#xff0c;主题域是数据模型的一个重要概念&#xff0c;它帮助构建逻辑清晰、层次分明的数据结构。主题域的设计基于企业的业务结构&#xff0c;将业务中的关键部分提炼出来&#xff0c;划分为若干个主题域。每个主题域对应一个特定的业务领域&#xff0c;便…...

开源表单生成器OpnForm

什么是 OpnForm &#xff1f; OpnForm 是一个开源的表单构建工具&#xff0c;旨在简化创建自定义表单的过程&#xff0c;特别适合无编码知识的用户。它通过人工智能优化表单创建流程&#xff0c;支持多种用途&#xff0c;如联系人表单、调查表等。OpnForm 提供了一个直观的拖放…...

Zookeeper面试整理-Zookeeper的基础概念

Zookeeper的基础概念是理解其作为分布式协调服务的核心要素。以下是一些关键的基础概念: 1. Zookeeper是什么? Zookeeper 是一个开源的分布式协调服务,用于分布式应用中的配置管理、命名服务、分布式锁、集群管理等任务。它提供了一组简单的原语,帮助开发人员构建健壮的分布…...

验证archive_command配置是否正确

要验证 archive_command 配置是否正确&#xff0c;你可以按照以下步骤进行&#xff1a; ‌检查配置文件‌&#xff1a; 确保 postgresql.conf&#xff08;或你的 PostgreSQL 实例使用的任何自定义配置文件&#xff09;中的 archive_command 已经设置为你想要的命令。 ‌重启 …...

2024.10.19小米笔试题解

第一题数独计数 考虑dfs遍历所有情况 n = int(input())def check(grid, x, y, v):dx = [1, 0, -1, 0]dy = [0, 1, 0, -1]for i in range(4):nx, ny = x + dx[i], y + dy[i]if 0 <= nx < 3 and 0 <= ny < 3:if grid[nx][ny] == 0:continueif abs(grid[nx][ny] - v…...

SQL-SERVER导入excel表格

首先先找到数据源&#xff0c;如上图。我们用的是excel表格。 这里你需要选择excel版本&#xff0c;反正你随便选&#xff0c;应该没什么问题的。 再导入数据 我们需要导入最后那个&#xff0c;也就是OLE DB Provider for SQL SERVER 只有这个才能导入到当前的数据库中 接下来…...

Vue学习笔记(三、v-cloak、v-text、v-html指令)

一、 v-cloak v-cloak 是 Vue.js 提供的一个特殊指令&#xff0c;用于在 Vue 实例准备完毕并开始进行 DOM 编译之前隐藏未编译的模板。它通常用于防止页面闪烁或者展示未编译的 Vue 模板语法。 你可以简单地在 HTML 元素上添加 v-cloak 指令&#xff0c;然后在确保 Vue…...

Java | Leetcode Java题解之第496题下一个更大元素I

题目&#xff1a; 题解&#xff1a; class Solution {public int[] nextGreaterElement(int[] nums1, int[] nums2) {Map<Integer, Integer> map new HashMap<Integer, Integer>();Deque<Integer> stack new ArrayDeque<Integer>();for (int i num…...

手把手教你用Matlab把PLL相噪曲线算成Jitter(附三种方法源码)

从PLL相噪曲线到Jitter计算的Matlab实战指南 在射频系统设计中&#xff0c;锁相环(PLL)的相位噪声性能直接影响通信质量与系统稳定性。频谱分析仪虽能捕捉相噪曲线&#xff0c;但工程师常需将其转换为更直观的时间抖动(Jitter)指标。本文将系统介绍三种Matlab实现方案&#xff…...

vscode如何添加ollama本地模型-实现token自由

vscode一直支持的都是云端闭源的模型&#xff0c;例如 GPT Claude等等&#xff0c;当这些闭源模型的免费额度用完之后&#xff0c;则需要付费继续使用。本文介绍的是vscode接入ollama的本地模型&#xff0c;从而实现token自由。 ollama 首先需要到ollama的官网下载ollama应用…...

Simulink新手必看:从零搭建四轴飞行器仿真模型(附完整代码)

Simulink实战&#xff1a;四轴飞行器仿真建模全流程解析 四轴飞行器作为无人机领域的经典构型&#xff0c;其控制系统的设计与验证一直是工程师和科研人员的重点课题。对于刚接触Simulink的开发者而言&#xff0c;如何将复杂的飞行动力学转化为可视化的仿真模型往往令人望而生畏…...

如何在ComfyUI中智能合成视频序列:VHS_VideoCombine节点的专业应用方案

如何在ComfyUI中智能合成视频序列&#xff1a;VHS_VideoCombine节点的专业应用方案 【免费下载链接】ComfyUI-VideoHelperSuite Nodes related to video workflows 项目地址: https://gitcode.com/gh_mirrors/co/ComfyUI-VideoHelperSuite 面对AI生成的大量图像序列&…...

Qwen2.5-7B微调保姆级教程:单卡十分钟快速上手,小白也能搞定

Qwen2.5-7B微调保姆级教程&#xff1a;单卡十分钟快速上手&#xff0c;小白也能搞定 1. 前言&#xff1a;为什么选择Qwen2.5-7B进行微调 大模型微调听起来很高深&#xff1f;其实没那么复杂。今天我要带大家用最简单的方式&#xff0c;在单张显卡上10分钟内完成Qwen2.5-7B模型…...

终极指南:5分钟学会用Wallpaper Engine下载器轻松获取创意工坊壁纸

终极指南&#xff1a;5分钟学会用Wallpaper Engine下载器轻松获取创意工坊壁纸 【免费下载链接】Wallpaper_Engine 一个便捷的创意工坊下载器 项目地址: https://gitcode.com/gh_mirrors/wa/Wallpaper_Engine 还在为Steam创意工坊里精美的动态壁纸无法直接下载而烦恼吗&…...

快速体验:Python3.8镜像开箱即用,无需配置直接写代码

快速体验&#xff1a;Python3.8镜像开箱即用&#xff0c;无需配置直接写代码 1. Python3.8镜像简介 Python作为当下最流行的编程语言之一&#xff0c;其3.8版本在性能优化和功能完善方面达到了一个成熟稳定的阶段。这个预配置好的Python3.8镜像&#xff0c;让你可以完全跳过繁…...

从‘硬’开关到‘软’启动:拆解一个经典PMOS缓启动电路,聊聊D4、D6这些二极管到底在忙啥?

从‘硬’开关到‘软’启动&#xff1a;拆解一个经典PMOS缓启动电路&#xff0c;聊聊D4、D6这些二极管到底在忙啥&#xff1f; 在硬件设计中&#xff0c;电源管理电路如同交响乐团的指挥&#xff0c;协调着各个器件的动作节奏。而缓启动电路&#xff0c;则是这位指挥手中那根至关…...

像素剧本圣殿部署指南:Qwen2.5-14B-Instruct在生产环境中稳定运行的GPU显存优化技巧

像素剧本圣殿部署指南&#xff1a;Qwen2.5-14B-Instruct在生产环境中稳定运行的GPU显存优化技巧 1. 项目概述 像素剧本圣殿&#xff08;Pixel Script Temple&#xff09;是一款基于Qwen2.5-14B-Instruct大模型深度微调的专业剧本创作工具。它将先进的AI推理能力与独特的8-Bit…...

GRPO实战:如何用多个reward function优化你的RL模型?(附完整代码示例)

GRPO实战&#xff1a;多奖励函数融合策略与代码实现指南 强化学习模型的效果很大程度上取决于奖励函数的设计。单一奖励函数往往难以全面评估复杂任务&#xff0c;而多奖励函数融合策略能更精准地引导模型学习。本文将深入探讨GRPO框架中多奖励函数的实战应用&#xff0c;从原理…...