Redis:基于PubSub(发布/订阅)、Stream流实现消息队列
Redis - PubSub、Stream流
文章目录
- Redis - PubSub、Stream流
- 1.基于List的消息队列
- 2.基于PubSub的消息队列
- 3.基于Stream的消息队列
- 1.Redis Streams简介
- 2.Redis Streams基本命令
- 1.XADD 添加消息到末尾
- 2.XLEN 获取消息长度
- 3.XREAD 读取消息 (单消费模式)
- 4.XGROUP 消费组操作
- 5.XREADGROUP GROUP 从消费组读取消息
- 6.XACK 消息确认
- 7.XPENDING 查看pend数据
1.基于List的消息队列
- 由于redis的list数据结构为双向链表,则可以通过lpush和rpop来模拟队列效果
- 由于队列没有消息时候,需要阻塞获取队列数据,而
lpop和rpop在空队列获取数据时会返回null,所以需要使用brpop和blpop来进行阻塞获取
#向data1的list存两个数据
lpush data1 aaa bbb
#右监听data1 等待20秒
brpop data1 20

缺点:
- 无法避免消息丢失,
- 只支持单消费者,无法广播
2.基于PubSub的消息队列
基于发布订阅形式,可以广播,生产者向channel(信道)发送消息,可以由多个消息者去订阅,订阅的消费者都可以收到消息
SUBSCRIBE channel [channel ...] #订阅一个或多个信道
PUBLISH channel message #向一个信道发送消息
PSUBSCRIBE pattern [pattern ...] #通过通配符匹配订阅的信道 匹配规则 ?代表一个字符 []代表中括号内的可选字符 *代表任意字符
SUBSCRIBE log
PUBLISH log zhangsan

缺点:
- 不支持持久化
- 消息有上限,超出会导致消息丢失
3.基于Stream的消息队列
1.Redis Streams简介
官方文档:https://redis.io/docs/latest/commands/xadd/
Redis Stream是redis在5.x版本引入的新特性,Redis流是一种数据结构,它类似于一个只可追加的日志,但也实现了多种操作,以克服典型只可追加日志的一些限制。这些操作包括O(1)时间的随机访问和复杂的消费策略,如消费者组。你可以使用流来记录并同时实时分发事件。Redis流的使用案例包括:
- 事件溯源(例如,跟踪用户操作、点击等)
- 传感器监测(例如,现场设备的读数)
- 通知(例如,将每个用户的通知记录存储在单独的流中)
Redis为每个流条目生成一个唯一的ID。你可以使用这些ID在后续检索与其关联的条目,或者读取并处理流中的所有后续条目。请注意,由于这些ID与时间相关,这里显示的ID可能会有所不同,与你自己的Redis实例中看到的ID也会有所不同。
Redis流支持多种修剪策略(以防止流无限制地增长)和多种消费策略(参见XREAD、XREADGROUP和XRANGE)。
2.Redis Streams基本命令
stream消息队列相关命令:
- XADD - 添加消息到末尾
- XTRIM - 对流进行修剪,限制长度
- XDEL - 删除消息
- XLEN - 获取流包含的元素数量,即消息长度
- XRANGE - 获取消息列表,会自动过滤已经删除的消息
- XREVRANGE - 反向获取消息列表,ID 从大到小
- XREAD - 以阻塞或非阻塞方式获取消息列表
消费者组相关命令:
- XGROUP CREATE - 创建消费者组
- XGROUP CREATECONSUMER 给指定的消费者组添加消费者
- XREADGROUP GROUP - 读取消费者组中的消息
- XACK - 将消息标记为"已处理"
- XGROUP SETID - 为消费者组设置新的最后递送消息ID
- XGROUP DELCONSUMER - 删除消费者
- XGROUP DESTROY - 删除消费者组
- XPENDING - 显示待处理消息的相关信息
- XCLAIM - 转移消息的归属权
- XINFO - 查看流和消费者组的相关信息;
- XINFO GROUPS - 打印消费者组的信息;
- XINFO STREAM - 打印流信息
1.XADD 添加消息到末尾
1.基本语法
XADD是唯一可以向流中添加数据的 Redis 命令,但 还有其他命令,例如 XDEL 和 XTRIM,能够 从流中删除数据。
XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]
key:队列名[NOMKSTREAM]:队列不存在是否自动创建,默认自动创建[<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] :设置消息队列的最大消息数量<* | id>:消息唯一id,*代表消息由redis自动生成,格式为时间戳-递增序列,可以手动指定field value:字段和值(键值对),可以一次添加多个
XADD users * name zhangsan age 18 #向user发送一条name为zhangsan,age为18的消息,返回消息id

2.指定stream的id参数
1526919030474-55
ID标识流中的给定消息数据。
如果指定的ID参数是*字符,XADD命令将自动生成一个唯一的ID。然而,尽管仅在极少数情况下有用,但可以指定一个格式良好的ID,以便新条目将与指定的ID完全相同。
XADD mystream 1526919030474-55 message "Hello,"
当自动生成ID时,第一部分是Redis实例生成ID的Unix时间(毫秒)。第二部分只是一个序列号,用于区分在同一毫秒内生成的ID。
XADD mystream 1526919030474-* message " World!"
还可以指定一个不完整的ID,只自动生成序列号部分(注意:6.0版本不支持报错)
stream的数据是有序的,所以消息的id始终的递增的,如果手动指定一个小于上一条数据的id则会出错
2.XLEN 获取消息长度
XLEN users #返回消息个数
3.XREAD 读取消息 (单消费模式)
基础语法
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
[COUNT count] :每次读取的最大数量[BLOCK milliseconds]:当消息没有时,是否阻塞,阻塞时间(毫秒),不阻塞就不给值,如果给0则永久阻塞等待STREAMS key:从那个队列读取消息,key为读取的队列名id [id ...]:起始id,代表从那个id的消息开始读取;0代表从第一个,$代表从最新的消息读取
测试
xread count 1 streams users 0 #读取users中最开始的一条数据

消息读取后不会删除,所有消费者都可以重复获取
xread count 1 streams users $ #读取最新消息 ,返回为nil空xread count 1 block 0 streams users $ #永久阻塞读取

但是阻塞方式监听到消息后会关闭,需要重新监听
此时在开发中我们可以使用死循环来无限读取最新消息进行监听
但是: 当指定起始id为$时,代表读取最新消息,如果处理消息过程中,又有超过一条以上的消息到达,则下次也只能获取一条最新的消息会导致其他数据漏读
4.XGROUP 消费组操作
-
消费者组:将多个消费者划分到同一个消费组,监听同一个队列 -
分流消费:队列中的消费将会分流给消费者组中的消费者,不会重复消费,加快消息消费速度 -
消息标识:消费者组会维护一个标识,记录最后一个被处理(非最新)的消息,即使redis挂机重启,也可以按照标识恢复读取,确保消息消费 -
消息确认机制:消费者获取消息后,消息处于pending状态,并存入一个pend-list,当处理完成时通过XACK来确认消息,标记为已处理,才pend-list移除
XGROUP CREATE 创建消费者组
XGROUP CREATE key group <id | $> [MKSTREAM] [ENTRIESREAD entries-read]
key:队列名称group:消费者组名称<id | $>:起始id标识, 0代表第一个, $代表最新消息[MKSTREAM]:队列不存在时自动创建队列,如果不存在且不指定会报错[ENTRIESREAD entries-read]: redis7.0后的参数,
创建消费者组g1
XGROUP CREATE users g1 0

XGROUP CREATECONSUMER 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key group consumer
XGROUP DESTROY 删除指定的消费者组
XGROUP DESTROY key group
XGROUP DELCONSUMER 删除消费者组中指定的消费者
XGROUP DELCONSUMER key group consumer
5.XREADGROUP GROUP 从消费组读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
group:消费者组名称consumer:消费者名称,如果不存在会自动创建[COUNT count] :每次读取的最大数量[BLOCK milliseconds]:当消息没有时,是否阻塞,阻塞时间(毫秒),不阻塞就不给值,如果给0则永久阻塞等待STREAMS key:从那个队列读取消息,key为读取的队列名[NOACK]无需消息确认(类似自动确认)。id [id ...]:起始id
**注意:**id取值:">" :从下一个未消费的消息开始,非最新消息,确保都消费
其他数字:根据指定id从pend-list中获取已消费但未确认消息,例如0,从pend-list第一个消息开始
所以当正常处理时的id都采用">" 进行消费,如果出现异常可以指定0,每次都读取第一个pend-list的消息,即每次都是读取最新的未处理数据,将异常数据处理掉
测试
XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS users >

6.XACK 消息确认
XACK key group id [id ...]
测试
XACK users g1 1733738565351-0 1733738570018-0 1733738567511-0 1733738587327-0

7.XPENDING 查看pend数据
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
key:队列名称group:消费者组名称[IDLE min-idle-time]::查看过去空闲时间的以上的消息,比如给5000,则查询空闲时间5000ms以上的消息start end消息起始范围 “- +”代表所有count:获取数量[consumer]:获取那个消费者的
测试
XPENDING users g1 - + 10

参考来源:https://www.bilibili.com/video/BV1cr4y1671t/?spm_id_from=333.788.videopod.episodes&vd_source=97a7d9497f7eb9e537f6b50df8831e27&p=75
相关文章:
Redis:基于PubSub(发布/订阅)、Stream流实现消息队列
Redis - PubSub、Stream流 文章目录 Redis - PubSub、Stream流1.基于List的消息队列2.基于PubSub的消息队列3.基于Stream的消息队列1.Redis Streams简介2.Redis Streams基本命令1.XADD 添加消息到末尾2.XLEN 获取消息长度3.XREAD 读取消息 (单消费模式)4…...
C#飞行棋(新手简洁版)
我们要在主函数的顶部写一些全局静态字段 确保能在后续的静态方法中能够获取到这些值和修改 static int[] Maps new int[100];static string[] PlayerName new string[2];static int[] PlayerScore new int[2];static bool[] Flags new bool[2] {true,true }; static int[]…...
【OpenCV】图像转换
理论 傅立叶变换用于分析各种滤波器的频率特性。对于图像,使用 2D离散傅里叶变换(DFT) 查找频域。快速算法称为 快速傅立叶变换(FFT) 用于计算DFT。 Numpy中的傅立叶变换 首先,我们将看到如何使用Numpy查…...
力扣 重排链表-143
重排链表-143 /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(int x) : val(x), next(nullptr) {}* ListNode(int x, ListNode *next) : val(x), next(next)…...
【Kubernetes理论篇】容器集群管理系统Kubernetes(K8S)
Kubernetes集群部署基本管理实战 这么好的机会,还在等什么! 01、Kubernetes 概述 K8S是什么 K8S 的全称为 Kubernetes (K12345678S),PS:“嘛,写全称也太累了吧,写”。不如整个缩写 K8s 作为缩写的结果…...
Kubernetes 常用操作大全:全面掌握 K8s 基础与进阶命令
Kubernetes(简称 K8s)作为一种开源的容器编排工具,已经成为现代分布式系统中的标准。它的强大之处在于能够自动化应用程序的部署、扩展和管理。在使用 Kubernetes 的过程中,熟悉常用操作对于高效地管理集群资源至关重要。本文将详…...
爬虫基础之Web网页基础
网页的组成 网页可以分为三大部分–HTML、CSS 和 JavaScript。如果把网页比作一个人,那么 HTML 相当于骨架、JavaScript 相当于肌肉、CSS 相当于皮肤,这三者结合起来才能形成一个完善的网页。下面我们分别介绍一下这三部分的功能。 HTML HTML(Hypertext…...
k8s, deployment
控制循环(control loop) for {实际状态 : 获取集群中对象X的实际状态(Actual State)期望状态 : 获取集群中对象X的期望状态(Desired State)if 实际状态 期望状态{什么都不做} else {执行编排动作…...
使用ensp搭建OSPF+BGP和静态路由,底层PC使用dhcp,实现PC互通
1.4种方式,实现PC2可以互通底层的所有设备 OSPF:OSPF是一种用于互联网协议网络的链路状态路由协议 BGP:是一种用于互联网上进行路由和可达性信息传递的外部网关协议(EGP) 静态路由: 静态路由是一种路由方…...
TÜLU 3: Pushing Frontiers in Open Language Model Post-Training
基本信息 📝 原文链接: https://arxiv.org/abs/2411.15124👥 作者: Nathan Lambert, Jacob Morrison, Valentina Pyatkin, Shengyi Huang, Hamish Ivison, Faeze Brahman, Lester James V. Miranda, Alisa Liu, Nouha Dziri, Shane Lyu, Yuling Gu, Sau…...
深入解读 MySQL EXPLAIN 与索引优化实践
MySQL 是当今最流行的关系型数据库之一,为了提升查询性能,合理使用 EXPLAIN 工具和优化索引显得尤为重要。本文将结合实际示例,探讨如何利用 EXPLAIN 分析查询执行计划,并分享索引优化的最佳实践。 一、EXPLAIN 工具简介 EXPLAIN …...
Flume——进阶(agent特性+三种结构:串联,多路复用,聚合)
目录 agent特性ChannelSelector描述: SinkProcessor描述: 串联架构结构图解定义与描述配置示例Flume1(监测端node1)Flume3(接收端node3)启动方式 复制和多路复用结构图解定义描述配置示例node1node2node3启…...
ragflow连ollama时出现的Bug
ragflow和ollama连接后,已经添加了两个模型但是ragflow仍然一直warn:Please add both embedding model and LLM in Settings > Model providers firstly.这里可能是我一开始拉取的镜像容器太小,容不下当前添加的模型,导…...
基于centos7.7编译Redis6.0
背景: OS:CentOs 7.7 Redis: 6.0.6 编译构建报错如下: In file included from server.c:30:0: server.h:1044:5: error: expected specifier-qualifier-list before ‘_Atomic’_Atomic unsigned int lruclock; /* Clock for LRU eviction …...
uni-app项目无法在Android Studio模拟器上运行
目录 1 问题描述2 尝试解决3 引发原因4 解决方法4.1 换用 MuMu 模拟器 5 结语 1 问题描述 在使用 uni-app 开发 Pad 端 App 时,初始化项目后打算先运行一下确保初始化正常。打开 Android Studio 模拟器后,然后在 HbuilderX 中选择使用 App 标准基座 运…...
第一部分:Linux系统(基础及命令)
Linux操作系统的实操性非常强,纯操作,不适用于日常的办公使用 1.初始Linux 1.1 操作系统概述 1.1.1 了解OS的作用 OS:是计算机软件的一种,主要负责:作为用户和计算机硬件之间的桥梁,调度和管理计算机硬…...
No module named ‘_ssl‘ No module named ‘_ctypes‘
如果你使用的是基于 yum 的 Linux 发行版(例如 CentOS、RHEL、Fedora),安装 libc6-dev 的方式稍有不同。在这些系统中,通常对应的包是 glibc-devel。 No module named ‘_ctypes’ 使用 yum 安装 glibc-devel 更新系统的软件包列…...
【QT】编写第一个 QT 程序 对象树 Qt 编程事项 内存泄露问题
目录 1. 编写第一个 QT 程序 1.1 使用 标签 实现 🐇 图形化界面实现 🐇 纯代码形式实现 1.2 使用 按钮 实现 🐋 图形化界面实现 🐋 纯代码形式实现 1.3 使用 编辑框 实现 🥝 图形化界面实现 ᾕ…...
VTK编程指南<六>:VTK可视化管线与渲染详解
1、VTK渲染引擎 回顾前几章节的RenderCylinder示例 可以找到以下的类: vtkProp; ytkAbstractMapper; vtkProperty; vtkCamera; vtkLight; vtkRenderer; vtkRenderWindow; vtkRenderWindowInteractor vtkTransform; vtkLookupTable;可以发现这些类都是与数据显示或渲染相关的。…...
基于STM32的智能计步器
引言 随着健康意识的提高,计步器逐渐成为人们日常生活中重要的健康管理工具。本文将指导你如何使用STM32微控制器制作一个智能计步器。该计步器通过加速度传感器检测步伐,并使用OLED显示屏显示步数。通过这个项目,你将学习到STM32开发的基本流…...
5个Zutilo技巧让你成为Zotero文献管理高手
5个Zutilo技巧让你成为Zotero文献管理高手 【免费下载链接】Zutilo Zotero plugin providing some additional editing features 项目地址: https://gitcode.com/gh_mirrors/zu/Zutilo 还在为Zotero的批量操作烦恼吗?每天面对成百上千的文献条目,…...
基于宏观通胀预测模型的利率预期重定价:华尔街降息路径为何出现系统性回撤?CPI成为关键校准变量
摘要:本文通过宏观通胀预测模型,结合利率预期曲线重定价算法与市场情绪迁移分析,对当前美通胀路径、CPI数据影响及华尔街降息预期变化进行系统性建模,分析利率政策预期从宽松交易向数据依赖模式切换的结构性原因。一、市场情绪迁移…...
演讲口才课到底有没有用?上完三个月后的真实反馈
三个月前,林薇坐在会议室的角落里,手里攥着一份精心准备的方案,却迟迟没有开口。那一刻,她看着同事们侃侃而谈,心里反复问自己:为什么明明有想法,却说不出来?就是那个瞬间࿰…...
LazyLLM:低代码多智能体应用开发框架实战指南
1. 项目概述:LazyLLM,一个为懒人开发者准备的多智能体应用构建工具如果你和我一样,在尝试构建一个基于大语言模型的智能应用时,感到头大——不是被各种框架的API调用搞晕,就是被模型部署、服务编排、数据流设计这些工程…...
【M1 Mac游戏开发环境】从零到一:VSCode、Git与效率工具的终极配置指南
1. M1 Mac开箱配置:为Unity开发者量身定制 刚拿到M1 Mac的Unity开发者们,你们是否遇到过这样的场景:打开VSCode写C#脚本时智能提示迟迟不出现,Git命令输到一半发现没有自动补全,或是被各种环境配置问题折腾得焦头烂额&…...
车规级国际物联卡是什么?车载物联网硬件选型与行业标准解析
随着跨境整车出口、改装车辆、工程机械外销、车载定位终端普及,车载联网通信要求持续升级。普通民用SIM卡无法适配车辆颠簸、温差跨度大、高速移动、跨境切换网络的复杂工况,车规级国际物联卡逐步成为车载智能化硬件的标配通信载体。很多出海设备厂商容易…...
Tegra K1深度解析:192核GPU如何重塑移动游戏与异构计算
1. 项目概述:一次移动游戏体验的底层革命 2014年,当小米发布其首款平板电脑MiPad,英伟达(Nvidia)同步推出Shield Tablet时,整个移动计算领域,尤其是安卓游戏生态,感受到了一次来自底…...
EMAC寄存器配置与网络性能优化实战
1. EMAC寄存器概述与核心功能以太网媒体访问控制器(EMAC)是现代嵌入式系统中实现网络通信的核心硬件模块,其寄存器配置直接决定了数据传输的可靠性、实时性和效率。作为硬件与协议栈之间的桥梁,EMAC通过精心设计的寄存器组实现了对…...
城市道路自动驾驶避障规划与MPC跟踪控制【附仿真】
✨ 长期致力于自动驾驶、路径规划、速度规划、跟踪控制、模型预测控制研究工作,擅长数据搜集与处理、建模仿真、程序编写、仿真设计。 ✅ 专业定制毕设、代码 ✅如需沟通交流,点击《获取方式》 (1)SL图五次多项式代价路径决策与凸…...
reverse-geocoder未来展望:AI增强地理编码与智能位置预测
reverse-geocoder未来展望:AI增强地理编码与智能位置预测 【免费下载链接】reverse-geocoder A fast, offline reverse geocoder in Python 项目地址: https://gitcode.com/gh_mirrors/re/reverse-geocoder 在当今数据驱动的世界中,地理编码技术已…...
