【redis-06】redis的stream流实现消息中间件
redis系列整体栏目
| 内容 | 链接地址 |
|---|---|
| 【一】redis基本数据类型和使用场景 | https://zhenghuisheng.blog.csdn.net/article/details/142406325 |
| 【二】redis的持久化机制和原理 | https://zhenghuisheng.blog.csdn.net/article/details/142441756 |
| 【三】redis缓存穿透、缓存击穿、缓存雪崩 | https://zhenghuisheng.blog.csdn.net/article/details/142577507 |
| 【四】redisson实现分布式锁实战和源码剖析 | https://zhenghuisheng.blog.csdn.net/article/details/142646301 |
| 【五】redis保证和mysql数据一致性 | https://zhenghuisheng.blog.csdn.net/article/details/142687101 |
| 【六】redis的stream流实现消息中间件 | https://zhenghuisheng.blog.csdn.net/article/details/142721269 |
如需转载,请输入:https://blog.csdn.net/zhenghuishengq/article/details/142721269
redis的stream流实现消息中间件
- 一,redis的新特性-队列stream
- 1,redis的stream流基本使用
- 2,stream队列消息消费
- 2.1,单消费者
- 2.2,消费者组
- 2.2.1,订阅消费者组
- 2.2.2,消息消费
- 3,stream出现之前如何实现消息中间件
- 3.1,list类型实现
- 3.2, Pub/Sub 发布订阅模式
- 4,stream底层设计及优化
- 4.1,队列设置最大值
- 4.2,使用消费者组
- 4.3,消息的应答机制
- 4.4,优化点
一,redis的新特性-队列stream
在了解这个redis的新特性之前,可以先查看一下官网的详细文档:stream流官方文档
redis的stream流是从5.0版本才开始提出,本人这里安装的是 6.2.6 版本。它的底层原理是借鉴于kafka的底层实现,因此可以参考本人前面的写的kafka的文章。redis 的stream流队列其主要组件有:消息队列、生产者、消费者、消费者组、消息及消息id、偏移量等

建立这个stream的主要原因,是作者想通过这个redis来取代mq那些中间件,redis在项目中时必不可少的,但是引入mq就会多引入一个第三方的中间件,让系统稳定性没那么高,mq一挂就有可能导致整个系统瘫痪
1,redis的stream流基本使用
创建一个stream队列的命令如下,通过xadd的方式实现往队列中添加消息。如下创建一个商品的队列,然后设置商品type类型为小米手机,商品名称name为小米8,得到的结果如下图
xadd product_queue * type xiaomi name xiaomi8
product_quque 表示队列的名称,***** 表示由服务器自动生成一个id,其id通过时间戳+序号(毫秒时间内第n条消息)

可以直接通过 xlen 命令查看队列的长度,可以发现已经队列的长度为4
xlen product_queue

也可以直接通过 xrange 命令将全部的消息展示出来,在后面需要加上 - + 两个命令操作符,也可以在后面加一个id来设置范围 。
- - 表示在这个队列中的最小的id,
- + 表示在这个队列里面最大的id
xrange product_queue - +
xrange product_queue - 1728139368101-0 //获取前两个
xrange product_queue 1728139374509-0 + //获取后两个

删除命令也比较简单,可以直接通过 xdel 命令实现删除,执行完命令之后,可以发现队列中的数据已被删除。但是这个xdel使用的是逻辑删除消息,而不是物理删除。
xdel product_queue 1728139368101-0

也可以查看整个stream队列的详细信息,可以直接通过 xinfo 命令来实现。其返回信息如下,length表示返回4条数据,
xinfo stream product_queue
返回的消息如下,会将整个队列的信息详细的返回,并且根据不同的redis版本返回一些不同的额外参数
127.0.0.1:6379> XINFO STREAM product_queue1) "length" 2) (integer) 5 //表示5条数据3) "radix-tree-keys" 4) (integer) 1 //用于存储 Stream 元素的 Radix Tree 中的键数量5) "radix-tree-nodes"6) (integer) 2 //Radix Tree 中的节点数量,反映了树的复杂度7) "groups"8) (integer) 2 //与此 Stream 相关的消费组(Consumer Group)数量9) "last-generated-id"
10) "1608049761947-0" //Stream 中最后一个生成的消息的 ID
11) "first-entry"
12) 1) "1608049732151-0"2) 1) "name"2) "item1"
13) "last-entry"
14) 1) "1608049761947-0"2) 1) "name"2) "item5"
2,stream队列消息消费
由于redis的stream流主要是借鉴于kafka,因此其内部消费方式和kafka一样,主要有单消费者和消费者组 。由于上面已经往 product_queue 队列中投递了消息,因此接下来主要讲解消息如何被消费
2.1,单消费者
单消费者也比较好理解,就是此时不属于任何一个消费者组中的消费者。其消费方式如下,可以直接通过 xread 的方式进行消息的读取
xread count 1 streams product_queue 0-0
- count表示读取消息的条数,比如后面接1表示只读取一条数据
- streams表示一个关键字,需要配合xread使用
- 0-0前面这个0表示读取队列最开始的消息,后面这个0表示只读取一条数据

除了从前面读取消息之外,也可以直接从后面开始读取数据,可以直接通过 $ 解决
xread count 1 streams product_queue $ //直接读取
xread block 0 count 1 streams product_queue $ //阻塞式读取
但是直接通过这种单消费者方式实现消息消费的话,也存在着一定的缺陷,因为单消费者消费消息,其消费完成的偏移量是需要手动实现提交的,因此单消费者实现消息消费会比较的复杂。
2.2,消费者组
2.2.1,订阅消费者组
上面提到了单消费者实现消息消费需要手动的提交偏移量,这样下次才知道当前消费者的消息消费到了哪里,在redis内部中,已经提供好了一个可以自动实现消息消费后记录偏移量的功能,不需要开发者自行的去实现。
创建消费者组的命令如下,可以通过xgroup 实现,如为 product_queue 的队列创建一个consumer1的消费者组,设置从头开始读取消息
xgroup create product_queue consumer1 0-0 //从前面开始消费消息
也可以创建一个名称为consumer2的消费者组,从后面开始读取消息
xgroup create product_queue consumer2 $ //从后面开始消费消息
可以直接通过 xinfo groups 命令来查看该队列对应的全部的消费者组的信息,可以发现此时已经有两个消费者组
xinfo groups product_queue

2.2.2,消息消费
在实现完消息订阅之后,由于kafka设计的理念是,一个分区下的消息只能被消费者组中的一条消息消费,因此redis中stream流的设计理念也一样。
其消息消费的命令如下,通过 xreadgroup 实现消费者组消费,GROUP表示一个关键字,需要和xreadGroup结合使用,consumer1表示一个消费者组,c1表示消费者中的任意一个消费者,count 1表示只消费一条消息,最后面的 > 表示获取的消息通过 last_delivered_id 后一条开始消费
xreadgroup GROUP consumer1 c1 count 1 streams product_queue >

在消息被消费完成之后,再来查看一下消费者组的详细消息,在上面执行这个命令时此时的consumer1消费者组对应的value值是0-0,当有消息被消费之后,这个消费者组对应的 last_delivered_id 就发生了改变,其指针往后移动了一位

其偏移量主要就是通过这个 last_delivered_id 来解决的,每次消费一条消息,偏移量就会往后移动一位,这样就能解决消息重复消费的问题,也不需要像单消费者一样需要手动去记录消息消费完后偏移量的记录。
也可以直接通过命令查看消费者本身的消息,通过 xinfo comsumers 结合使用,查看哪个队列下面的那个消费者组,可以发现此时有一个c1的消费者进行消费,并且有一条消息处于pending未确认的状态
xinfo consumers product_queue consumer1

当然也可以通过命令的方式手动的进行消息消费的确认机制,通过xack的机制进行手动的确认,再次查询这个消费者详细信息之后,可以发现此时处于pending未确认的状态的消息已经被确认了,此时的值为0
xack product_queue consumer1 1728141022160-0

3,stream出现之前如何实现消息中间件
3.1,list类型实现
在list的数据类型中,可以通过Lpush+Rprop的方式实现消息中间件,其原理也比较简单,生产者从列表的左边加入消息,消费者从列表的右边消费消息, 这样保证了消息先进先出(FIFO)原则,适用于简单的消息队列系统 。
- 生产者:使用
LPUSH向列表的左边插入消息 - 消费者:使用
RPOP从列表的右边消费消息。
但是这种数据类型也存在缺陷,只能适用于小型轻量级、快速开发的场景,如果遇到了高并发场景,或者消息需要手动确认机制等场景,那么这种list方式就不太合适

3.2, Pub/Sub 发布订阅模式
redis内部也提供了一种发布订阅的模式,其简单使用如下,就是通过publish发送消息,subscribe接收消息
PUBLISH channel message: 发布者通过该命令向 channel 发布 message。
SUBSCRIBE channel: 订阅者通过该命令订阅 channel,并接收其发布的消息。
在redis中,这种发布订阅一般比较的适用于实时场景,如实时消息推送,聊天等场景。但是缺陷也明显:
- 首先内部并没有提供持久化机制,意味着数据会丢失
- 其次内部也没有提供消息小人机制,某些高可靠场景不适合
- 消息堆积很可能造成redis宕机问题

4,stream底层设计及优化
再讲解上面的基本使用之后,再来看这幅图就比较简单。

4.1,队列设置最大值
在redis中一个队列的大小也可以设置最大值,防止因为队列太长导致内存不足而而宕机。再创建队列时可以直接通过 MAXLEN 设置最大值,并且可以通过一个 ~ 设置成一个近似值,也可以不加这个近似值成为一个精确值
//最大值设置成1000,并且是一个近似值
xadd product_queue MAXLEN ~ 1000 * type xiaomi name xiaomi8//最大值设置成1000,并且是一个精确值
xadd product_queue MAXLEN 1000 * type xiaomi name xiaomi8
近似值往往可以更加灵活,在性能上高于精准值。当达到或者超过这个设置的值的时候,redis就会触发内存淘汰策略将数据淘汰。
4.2,使用消费者组
在消费者端应该直接考虑使用消费者组而不是单消费者,每个消费者组内部有一个 last_delivered_id ,可以通过这个字段记录对应的偏移量,这样如果出现宕机或者重启等情况,就能知道消费者消费到了哪个偏移量上面,从而从根本上解决一些消息重复消费等情况
4.3,消息的应答机制
在每个消费者组中,都会有一个pendding ids的数组,这个数组会记录所有未应答的消息id,可以通过确认数组中的id来保证消息确实被消费。当消息长时间不被ack应答时,也会被触发内存淘汰策略被淘汰
4.4,优化点
如果设计的队列太多,可以考虑部署一些cluster集群、哨兵+主从集群来保证整个系统的高可用和高性能。如果一个队列中的消息被大量的生产和消费,可以考虑 写热点分散 的方式将数据多分布在几个队列里面,然后通过hash或者轮询等方式进行消息的消费
相关文章:
【redis-06】redis的stream流实现消息中间件
redis系列整体栏目 内容链接地址【一】redis基本数据类型和使用场景https://zhenghuisheng.blog.csdn.net/article/details/142406325【二】redis的持久化机制和原理https://zhenghuisheng.blog.csdn.net/article/details/142441756【三】redis缓存穿透、缓存击穿、缓存雪崩htt…...
二、MySQL的数据目录
文章目录 1. MySQL8的主要目录结构1.1 数据库文件的存放路径1.2 相关命令目录1.3 配置文件目录 2. 数据库和文件系统的关系2.1 查看默认数据库2.2 数据库在文件系统中的表示2.3 表在文件系统中的表示2.3.1 InnoDB存储引擎模式2.3.2 MyISAM存储引擎模式 2.4 小结 1. MySQL8的主要…...
2024.10月7~10日 进一步完善《电信资费管理系统》
一、新增的模块: 在原项目基础上,新增加了以下功能: 1、增加AspectJ 框架的AOP 异常记录和事务管理模块。 2、增加SpringMVC的拦截器,实现登录 控制页面访问权限。 3、增加 Logback日志框架,记录日志。 4、增加动态验…...
vue2项目的路由使用history模式,刷新会导致页面404的问题
在vue2项目中,如果我们使用的路由是history模式,刷新会导致页面404,解决方法很简单,在vue.config.js文件中的devServer下增加historyApiFallback: true; 代码如下: module.exports {devServer: {historyApiFallback: true,} }...
pytest框架之fixture测试夹具详解
前言 大家下午好呀,今天呢来和大家唠唠pytest中的fixtures夹具的详解,废话就不多说了咱们直接进入主题哈。 一、fixture的优势 pytest框架的fixture测试夹具就相当于unittest框架的setup、teardown,但相对之下它的功能更加强大和灵活。 …...
【浏览器】如何正确使用Microsoft Edge
1、清理主页广告 如今的Microsoft Edge 浏览器 主页太乱了,各种广告推送,点右上角⚙️设置,把快速链接、网站导航、信息提要、背景等全部关闭。这样你就能得到一个超级清爽的主页。 网站导航 关闭 …...
打印1000年到2000年之间的闰年
我们要打印1000年到2000年之间的闰年,首先我们先输出1000年到2000年之间的所有的年份,同时我们将闰年的判断方法输入到其中 闰年需要满足下列两个条件的其中之一: 1.能被4整除但不能被100整除 2.能被400整除 打印1000年到2000年之间的闰年…...
nn.Identity()
在 PyTorch 中,nn.Identity()是一个简单的模块,它的作用是在模型中作为一个占位符或者不进行任何操作的层,直接返回输入。 一、使用方法 以下是一个简单的使用示例: import torch import torch.nn as nn# 创建一个 Identity 层…...
Java 快速排序
快速排序(Quicksort)是一种高效的排序算法,采用分治法(Divide and Conquer)的策略来把一个序列分为较小和较大的两个子序列,然后递归地排序两个子序列。以下是用Java实现的快速排序算法: publi…...
51单片机的智能衣柜【proteus仿真+程序+报告+原理图+演示视频】
1、主要功能 该系统由AT89C51/STC89C52单片机LCD1602显示模块光照传感器时钟模块温湿度传感器继电器按键、LED等模块构成。适用于智能衣柜、智能衣橱、紫外线定时消毒等相似项目。 可实现功能: 1、LCD1602实时显示北京时间、温湿度和开关门状态 2、时钟模块DS1302采集时间 …...
SAP_FI_表ACDOCA取代的表
在 SAP S/4HANA 系统中,ACDOCA(通用分录表,Universal Journal)引入了全新的数据结构,取代了原先 ERP 系统中多个财务和控制模块的表。ACDOCA 通过一个单一表格整合了财务会计(FI)和管理会计&…...
论文《OneLLM:One Framework to Align All Modalities with Language》
(没有会员只有做100个节点,mindmaster金主爸爸可不可以给我一个会员啊啊啊啊呜呜呜~) 欣赏论文的图和表: 表中作者将自己的模型那一行选择灰色作为背景,更加凸显自己的数据,另外对于最好的结果用加粗黑体…...
Ubuntu 22.04.4 LTS更换下载源
方法1:使用图形界面更换下载源 1. 打开软件和更新应用 2. 在Ubuntu 软件标签中,点击“下载自”旁边的下拉菜单,选择“其他” 3. 点击“选择最佳服务器”来自动选择最快的服务器 4. 选择服务器 5. 确定并关闭窗口,系统会提示您重新…...
html嵌入百度地图
html嵌入百度地图 key地址 https://lbsyun.baidu.com/apiconsole/key#/home ,点进去注册应用、然后复制key换掉即可显示地图 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>百度地图搜索…...
【网络】详解TCP协议中的可靠传输
【网络】详解TCP协议中的可靠传输 一. TCP协议段格式二. 确认应答——确保可靠性的核心机制1.确保时序2.确保发送方知道数据是否被对方接收到 三. 超时重传1. 发送的数据丢包2. ACK报文丢失 一. TCP协议段格式 TCP协议段格式相比UDP要复杂很多,很多内容需要我们了解…...
【Python实例】Python读取并绘制nc数据
【Python实例】Python读取并绘制nc数据 准备:安装netCDF库等读取nc数据相关信息绘制图形利用basemap绘图 参考 准备:安装netCDF库等 以【1960-2020年中国1km分辨率月降水数据集】中2020年降水为例。 先在Panopoly中查看数据属性,如下&#…...
swift使用llama3.2-vision微调xray数据集
1.数据格式 [{"query": "通过这张胸部X光影像可以诊断出什么?","response": "根据X射线图像,心脏大小正常,肺部看起来很清晰。已经排除了肺炎、积液、水肿、气胸、腺病、结节或肿块的存在。该发现表明一切正常。换句话说,总体印象是胸…...
学习小课堂
1.多服务节点下Session-Cooki方案如何做? Session-Cookie 方案在单体环境是一个非常好的身份认证方案。但是,当服务器水平拓展成多节点时,Session-Cookie 方案就要面临挑战了。 举个例子:假如我们部署了两份相同的服务 A&#x…...
stm32学习笔记-RTC实时时钟
文章目录 一、RTC基础知识1.1 RTC简介1.2 RTC的晶振 二、stm32的RTC2.1 RTC和后备寄存器2.2 stm32 RTC结构框图及特性 三、stm32 RTC编程2.1 RTC初始化2.2 RTC控制程序 一、RTC基础知识 1.1 RTC简介 实时时钟的缩写是RTC(Real_Time Clock)。RTC 是集成电路,通常称…...
简历中的期望薪资怎么定?
在简历中撰写期望薪资时,既要体现你的价值认知,又要保持一定的灵活性和开放性,以便在后续的面试和薪资谈判中留有余地。以下是一些撰写期望薪资的合理方法: 一、明确薪资范围 1.市场调研: 在撰写期望薪资前…...
如何用轻量工具实现Windows 11系统深度净化?
如何用轻量工具实现Windows 11系统深度净化? 【免费下载链接】Win11Debloat 一个简单的PowerShell脚本,用于从Windows中移除预装的无用软件,禁用遥测,从Windows搜索中移除Bing,以及执行各种其他更改以简化和改善你的Wi…...
普冉PY32F071内存紧张?FreeRTOS配置优化全攻略(含heap_4选择与任务栈设置)
普冉PY32F071内存紧张?FreeRTOS配置优化全攻略(含heap_4选择与任务栈设置) 当你在PY32F071这颗Cortex-M0芯片上运行FreeRTOS时,是否遇到过任务莫名崩溃、系统运行不稳定的情况?作为一款仅有20KB RAM的微控制器…...
从Provisional headers are shown到证书过期:uniapp请求无响应的幕后真相
从Provisional headers are shown到证书过期:uniapp请求无响应的深度排查指南 当你正在调试一个运行良好的uniapp项目时,突然发现所有网络请求在真机上毫无征兆地停止工作——没有错误提示,没有响应数据,只有开发者工具中冷冰冰的…...
告别本地编译卡顿:用CLion+Docker容器实现丝滑的Linux远程C++开发(保姆级教程)
告别本地编译卡顿:用CLionDocker容器实现丝滑的Linux远程C开发(保姆级教程) 在Windows或Mac上开发Linux C项目时,你是否经历过这些困扰:本地交叉编译环境配置复杂、编译速度缓慢、依赖冲突频发,或是开发环境…...
Gpmall分布式事务处理:订单创建与库存扣减的最终一致性保障
Gpmall分布式事务处理:订单创建与库存扣减的最终一致性保障 【免费下载链接】gpmall 项目地址: https://gitcode.com/gh_mirrors/gp/gpmall 在电商系统中,订单创建与库存扣减的分布式事务处理是确保数据一致性的核心挑战。Gpmall项目通过创新的P…...
HarmonyOS6 半年磨一剑 - RcTextarea 组件核心架构与类型系统设计
文章目录前言一、组件整体架构1.1 文件结构1.2 装饰器体系二、类型系统深度解析2.1 边框模式类型2.2 清空触发类型2.3 格式化与解析函数类型2.4 文本对齐与回车键类型三、核心参数体系3.1 必传参数3.2 尺寸相关参数3.3 功能开关参数四、内部状态与生命周期4.1 内部状态设计4.2 …...
华硕笔记本终极电池拯救指南:用G-Helper实现智能充电与健康修复
华硕笔记本终极电池拯救指南:用G-Helper实现智能充电与健康修复 【免费下载链接】g-helper Lightweight Armoury Crate alternative for Asus laptops. Control tool for ROG Zephyrus G14, G15, G16, M16, Flow X13, Flow X16, TUF, Strix, Scar and other models …...
基于粒子群优化算法的地表水源热泵机组优化调度 以水源热泵机组角度对地表水源热泵系统建模
基于粒子群优化算法的地表水源热泵机组优化调度 以水源热泵机组角度对地表水源热泵系统建模, 并采用粒子群优化算法优化算法求解热泵机组每小时最佳制冷量和制热量 最近帮朋友做了个小区地表水源热泵的调度优化项目,一开始以为就是调调空调温度…...
医学影像与卫星图的救星?深入聊聊JPEG-LS算法在边缘计算设备上的应用优势
JPEG-LS算法:边缘计算时代的医学影像与卫星图像压缩利器 当一台CT扫描仪每秒产生数百张16位深度的医学影像,或一颗遥感卫星每天传回数TB的高清地表数据时,传统的图像压缩方案往往面临两难选择——要么牺牲宝贵的诊断细节,要么耗尽…...
算法优化中的寄存器重用与内存映射策略的技术6
寄存器重用与内存映射策略在算法优化中的重要性寄存器重用和内存映射是提升计算密集型算法性能的关键技术,通过减少数据访问延迟和优化存储层次结构的使用,显著提高执行效率。寄存器重用的核心方法与技术数据局部性利用 通过循环展开(Loop Un…...
