【redis-06】redis的stream流实现消息中间件
redis系列整体栏目
内容 | 链接地址 |
---|---|
【一】redis基本数据类型和使用场景 | https://zhenghuisheng.blog.csdn.net/article/details/142406325 |
【二】redis的持久化机制和原理 | https://zhenghuisheng.blog.csdn.net/article/details/142441756 |
【三】redis缓存穿透、缓存击穿、缓存雪崩 | https://zhenghuisheng.blog.csdn.net/article/details/142577507 |
【四】redisson实现分布式锁实战和源码剖析 | https://zhenghuisheng.blog.csdn.net/article/details/142646301 |
【五】redis保证和mysql数据一致性 | https://zhenghuisheng.blog.csdn.net/article/details/142687101 |
【六】redis的stream流实现消息中间件 | https://zhenghuisheng.blog.csdn.net/article/details/142721269 |
如需转载,请输入:https://blog.csdn.net/zhenghuishengq/article/details/142721269
redis的stream流实现消息中间件
- 一,redis的新特性-队列stream
- 1,redis的stream流基本使用
- 2,stream队列消息消费
- 2.1,单消费者
- 2.2,消费者组
- 2.2.1,订阅消费者组
- 2.2.2,消息消费
- 3,stream出现之前如何实现消息中间件
- 3.1,list类型实现
- 3.2, Pub/Sub 发布订阅模式
- 4,stream底层设计及优化
- 4.1,队列设置最大值
- 4.2,使用消费者组
- 4.3,消息的应答机制
- 4.4,优化点
一,redis的新特性-队列stream
在了解这个redis的新特性之前,可以先查看一下官网的详细文档:stream流官方文档
redis的stream流是从5.0版本才开始提出,本人这里安装的是 6.2.6 版本。它的底层原理是借鉴于kafka的底层实现,因此可以参考本人前面的写的kafka的文章。redis 的stream流队列其主要组件有:消息队列、生产者、消费者、消费者组、消息及消息id、偏移量等
建立这个stream的主要原因,是作者想通过这个redis来取代mq那些中间件,redis在项目中时必不可少的,但是引入mq就会多引入一个第三方的中间件,让系统稳定性没那么高,mq一挂就有可能导致整个系统瘫痪
1,redis的stream流基本使用
创建一个stream队列的命令如下,通过xadd的方式实现往队列中添加消息。如下创建一个商品的队列,然后设置商品type类型为小米手机,商品名称name为小米8,得到的结果如下图
xadd product_queue * type xiaomi name xiaomi8
product_quque 表示队列的名称,***** 表示由服务器自动生成一个id,其id通过时间戳+序号(毫秒时间内第n条消息)
可以直接通过 xlen 命令查看队列的长度,可以发现已经队列的长度为4
xlen product_queue
也可以直接通过 xrange 命令将全部的消息展示出来,在后面需要加上 - + 两个命令操作符,也可以在后面加一个id来设置范围 。
- - 表示在这个队列中的最小的id,
- + 表示在这个队列里面最大的id
xrange product_queue - +
xrange product_queue - 1728139368101-0 //获取前两个
xrange product_queue 1728139374509-0 + //获取后两个
删除命令也比较简单,可以直接通过 xdel 命令实现删除,执行完命令之后,可以发现队列中的数据已被删除。但是这个xdel使用的是逻辑删除消息,而不是物理删除。
xdel product_queue 1728139368101-0
也可以查看整个stream队列的详细信息,可以直接通过 xinfo 命令来实现。其返回信息如下,length表示返回4条数据,
xinfo stream product_queue
返回的消息如下,会将整个队列的信息详细的返回,并且根据不同的redis版本返回一些不同的额外参数
127.0.0.1:6379> XINFO STREAM product_queue1) "length" 2) (integer) 5 //表示5条数据3) "radix-tree-keys" 4) (integer) 1 //用于存储 Stream 元素的 Radix Tree 中的键数量5) "radix-tree-nodes"6) (integer) 2 //Radix Tree 中的节点数量,反映了树的复杂度7) "groups"8) (integer) 2 //与此 Stream 相关的消费组(Consumer Group)数量9) "last-generated-id"
10) "1608049761947-0" //Stream 中最后一个生成的消息的 ID
11) "first-entry"
12) 1) "1608049732151-0"2) 1) "name"2) "item1"
13) "last-entry"
14) 1) "1608049761947-0"2) 1) "name"2) "item5"
2,stream队列消息消费
由于redis的stream流主要是借鉴于kafka,因此其内部消费方式和kafka一样,主要有单消费者和消费者组 。由于上面已经往 product_queue 队列中投递了消息,因此接下来主要讲解消息如何被消费
2.1,单消费者
单消费者也比较好理解,就是此时不属于任何一个消费者组中的消费者。其消费方式如下,可以直接通过 xread 的方式进行消息的读取
xread count 1 streams product_queue 0-0
- count表示读取消息的条数,比如后面接1表示只读取一条数据
- streams表示一个关键字,需要配合xread使用
- 0-0前面这个0表示读取队列最开始的消息,后面这个0表示只读取一条数据
除了从前面读取消息之外,也可以直接从后面开始读取数据,可以直接通过 $ 解决
xread count 1 streams product_queue $ //直接读取
xread block 0 count 1 streams product_queue $ //阻塞式读取
但是直接通过这种单消费者方式实现消息消费的话,也存在着一定的缺陷,因为单消费者消费消息,其消费完成的偏移量是需要手动实现提交的,因此单消费者实现消息消费会比较的复杂。
2.2,消费者组
2.2.1,订阅消费者组
上面提到了单消费者实现消息消费需要手动的提交偏移量,这样下次才知道当前消费者的消息消费到了哪里,在redis内部中,已经提供好了一个可以自动实现消息消费后记录偏移量的功能,不需要开发者自行的去实现。
创建消费者组的命令如下,可以通过xgroup 实现,如为 product_queue 的队列创建一个consumer1的消费者组,设置从头开始读取消息
xgroup create product_queue consumer1 0-0 //从前面开始消费消息
也可以创建一个名称为consumer2的消费者组,从后面开始读取消息
xgroup create product_queue consumer2 $ //从后面开始消费消息
可以直接通过 xinfo groups 命令来查看该队列对应的全部的消费者组的信息,可以发现此时已经有两个消费者组
xinfo groups product_queue
2.2.2,消息消费
在实现完消息订阅之后,由于kafka设计的理念是,一个分区下的消息只能被消费者组中的一条消息消费,因此redis中stream流的设计理念也一样。
其消息消费的命令如下,通过 xreadgroup 实现消费者组消费,GROUP表示一个关键字,需要和xreadGroup结合使用,consumer1表示一个消费者组,c1表示消费者中的任意一个消费者,count 1表示只消费一条消息,最后面的 > 表示获取的消息通过 last_delivered_id 后一条开始消费
xreadgroup GROUP consumer1 c1 count 1 streams product_queue >
在消息被消费完成之后,再来查看一下消费者组的详细消息,在上面执行这个命令时此时的consumer1消费者组对应的value值是0-0,当有消息被消费之后,这个消费者组对应的 last_delivered_id 就发生了改变,其指针往后移动了一位
其偏移量主要就是通过这个 last_delivered_id 来解决的,每次消费一条消息,偏移量就会往后移动一位,这样就能解决消息重复消费的问题,也不需要像单消费者一样需要手动去记录消息消费完后偏移量的记录。
也可以直接通过命令查看消费者本身的消息,通过 xinfo comsumers 结合使用,查看哪个队列下面的那个消费者组,可以发现此时有一个c1的消费者进行消费,并且有一条消息处于pending未确认的状态
xinfo consumers product_queue consumer1
当然也可以通过命令的方式手动的进行消息消费的确认机制,通过xack的机制进行手动的确认,再次查询这个消费者详细信息之后,可以发现此时处于pending未确认的状态的消息已经被确认了,此时的值为0
xack product_queue consumer1 1728141022160-0
3,stream出现之前如何实现消息中间件
3.1,list类型实现
在list的数据类型中,可以通过Lpush+Rprop的方式实现消息中间件,其原理也比较简单,生产者从列表的左边加入消息,消费者从列表的右边消费消息, 这样保证了消息先进先出(FIFO)原则,适用于简单的消息队列系统 。
- 生产者:使用
LPUSH
向列表的左边插入消息 - 消费者:使用
RPOP
从列表的右边消费消息。
但是这种数据类型也存在缺陷,只能适用于小型轻量级、快速开发的场景,如果遇到了高并发场景,或者消息需要手动确认机制等场景,那么这种list方式就不太合适
3.2, Pub/Sub 发布订阅模式
redis内部也提供了一种发布订阅的模式,其简单使用如下,就是通过publish发送消息,subscribe接收消息
PUBLISH channel message
: 发布者通过该命令向 channel
发布 message
。
SUBSCRIBE channel
: 订阅者通过该命令订阅 channel
,并接收其发布的消息。
在redis中,这种发布订阅一般比较的适用于实时场景,如实时消息推送,聊天等场景。但是缺陷也明显:
- 首先内部并没有提供持久化机制,意味着数据会丢失
- 其次内部也没有提供消息小人机制,某些高可靠场景不适合
- 消息堆积很可能造成redis宕机问题
4,stream底层设计及优化
再讲解上面的基本使用之后,再来看这幅图就比较简单。
4.1,队列设置最大值
在redis中一个队列的大小也可以设置最大值,防止因为队列太长导致内存不足而而宕机。再创建队列时可以直接通过 MAXLEN 设置最大值,并且可以通过一个 ~ 设置成一个近似值,也可以不加这个近似值成为一个精确值
//最大值设置成1000,并且是一个近似值
xadd product_queue MAXLEN ~ 1000 * type xiaomi name xiaomi8//最大值设置成1000,并且是一个精确值
xadd product_queue MAXLEN 1000 * type xiaomi name xiaomi8
近似值往往可以更加灵活,在性能上高于精准值。当达到或者超过这个设置的值的时候,redis就会触发内存淘汰策略将数据淘汰。
4.2,使用消费者组
在消费者端应该直接考虑使用消费者组而不是单消费者,每个消费者组内部有一个 last_delivered_id ,可以通过这个字段记录对应的偏移量,这样如果出现宕机或者重启等情况,就能知道消费者消费到了哪个偏移量上面,从而从根本上解决一些消息重复消费等情况
4.3,消息的应答机制
在每个消费者组中,都会有一个pendding ids的数组,这个数组会记录所有未应答的消息id,可以通过确认数组中的id来保证消息确实被消费。当消息长时间不被ack应答时,也会被触发内存淘汰策略被淘汰
4.4,优化点
如果设计的队列太多,可以考虑部署一些cluster集群、哨兵+主从集群来保证整个系统的高可用和高性能。如果一个队列中的消息被大量的生产和消费,可以考虑 写热点分散 的方式将数据多分布在几个队列里面,然后通过hash或者轮询等方式进行消息的消费
相关文章:

【redis-06】redis的stream流实现消息中间件
redis系列整体栏目 内容链接地址【一】redis基本数据类型和使用场景https://zhenghuisheng.blog.csdn.net/article/details/142406325【二】redis的持久化机制和原理https://zhenghuisheng.blog.csdn.net/article/details/142441756【三】redis缓存穿透、缓存击穿、缓存雪崩htt…...

二、MySQL的数据目录
文章目录 1. MySQL8的主要目录结构1.1 数据库文件的存放路径1.2 相关命令目录1.3 配置文件目录 2. 数据库和文件系统的关系2.1 查看默认数据库2.2 数据库在文件系统中的表示2.3 表在文件系统中的表示2.3.1 InnoDB存储引擎模式2.3.2 MyISAM存储引擎模式 2.4 小结 1. MySQL8的主要…...

2024.10月7~10日 进一步完善《电信资费管理系统》
一、新增的模块: 在原项目基础上,新增加了以下功能: 1、增加AspectJ 框架的AOP 异常记录和事务管理模块。 2、增加SpringMVC的拦截器,实现登录 控制页面访问权限。 3、增加 Logback日志框架,记录日志。 4、增加动态验…...

vue2项目的路由使用history模式,刷新会导致页面404的问题
在vue2项目中,如果我们使用的路由是history模式,刷新会导致页面404,解决方法很简单,在vue.config.js文件中的devServer下增加historyApiFallback: true; 代码如下: module.exports {devServer: {historyApiFallback: true,} }...

pytest框架之fixture测试夹具详解
前言 大家下午好呀,今天呢来和大家唠唠pytest中的fixtures夹具的详解,废话就不多说了咱们直接进入主题哈。 一、fixture的优势 pytest框架的fixture测试夹具就相当于unittest框架的setup、teardown,但相对之下它的功能更加强大和灵活。 …...

【浏览器】如何正确使用Microsoft Edge
1、清理主页广告 如今的Microsoft Edge 浏览器 主页太乱了,各种广告推送,点右上角⚙️设置,把快速链接、网站导航、信息提要、背景等全部关闭。这样你就能得到一个超级清爽的主页。 网站导航 关闭 …...

打印1000年到2000年之间的闰年
我们要打印1000年到2000年之间的闰年,首先我们先输出1000年到2000年之间的所有的年份,同时我们将闰年的判断方法输入到其中 闰年需要满足下列两个条件的其中之一: 1.能被4整除但不能被100整除 2.能被400整除 打印1000年到2000年之间的闰年…...

nn.Identity()
在 PyTorch 中,nn.Identity()是一个简单的模块,它的作用是在模型中作为一个占位符或者不进行任何操作的层,直接返回输入。 一、使用方法 以下是一个简单的使用示例: import torch import torch.nn as nn# 创建一个 Identity 层…...

Java 快速排序
快速排序(Quicksort)是一种高效的排序算法,采用分治法(Divide and Conquer)的策略来把一个序列分为较小和较大的两个子序列,然后递归地排序两个子序列。以下是用Java实现的快速排序算法: publi…...

51单片机的智能衣柜【proteus仿真+程序+报告+原理图+演示视频】
1、主要功能 该系统由AT89C51/STC89C52单片机LCD1602显示模块光照传感器时钟模块温湿度传感器继电器按键、LED等模块构成。适用于智能衣柜、智能衣橱、紫外线定时消毒等相似项目。 可实现功能: 1、LCD1602实时显示北京时间、温湿度和开关门状态 2、时钟模块DS1302采集时间 …...

SAP_FI_表ACDOCA取代的表
在 SAP S/4HANA 系统中,ACDOCA(通用分录表,Universal Journal)引入了全新的数据结构,取代了原先 ERP 系统中多个财务和控制模块的表。ACDOCA 通过一个单一表格整合了财务会计(FI)和管理会计&…...

论文《OneLLM:One Framework to Align All Modalities with Language》
(没有会员只有做100个节点,mindmaster金主爸爸可不可以给我一个会员啊啊啊啊呜呜呜~) 欣赏论文的图和表: 表中作者将自己的模型那一行选择灰色作为背景,更加凸显自己的数据,另外对于最好的结果用加粗黑体…...

Ubuntu 22.04.4 LTS更换下载源
方法1:使用图形界面更换下载源 1. 打开软件和更新应用 2. 在Ubuntu 软件标签中,点击“下载自”旁边的下拉菜单,选择“其他” 3. 点击“选择最佳服务器”来自动选择最快的服务器 4. 选择服务器 5. 确定并关闭窗口,系统会提示您重新…...

html嵌入百度地图
html嵌入百度地图 key地址 https://lbsyun.baidu.com/apiconsole/key#/home ,点进去注册应用、然后复制key换掉即可显示地图 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>百度地图搜索…...

【网络】详解TCP协议中的可靠传输
【网络】详解TCP协议中的可靠传输 一. TCP协议段格式二. 确认应答——确保可靠性的核心机制1.确保时序2.确保发送方知道数据是否被对方接收到 三. 超时重传1. 发送的数据丢包2. ACK报文丢失 一. TCP协议段格式 TCP协议段格式相比UDP要复杂很多,很多内容需要我们了解…...

【Python实例】Python读取并绘制nc数据
【Python实例】Python读取并绘制nc数据 准备:安装netCDF库等读取nc数据相关信息绘制图形利用basemap绘图 参考 准备:安装netCDF库等 以【1960-2020年中国1km分辨率月降水数据集】中2020年降水为例。 先在Panopoly中查看数据属性,如下&#…...

swift使用llama3.2-vision微调xray数据集
1.数据格式 [{"query": "通过这张胸部X光影像可以诊断出什么?","response": "根据X射线图像,心脏大小正常,肺部看起来很清晰。已经排除了肺炎、积液、水肿、气胸、腺病、结节或肿块的存在。该发现表明一切正常。换句话说,总体印象是胸…...

学习小课堂
1.多服务节点下Session-Cooki方案如何做? Session-Cookie 方案在单体环境是一个非常好的身份认证方案。但是,当服务器水平拓展成多节点时,Session-Cookie 方案就要面临挑战了。 举个例子:假如我们部署了两份相同的服务 A&#x…...

stm32学习笔记-RTC实时时钟
文章目录 一、RTC基础知识1.1 RTC简介1.2 RTC的晶振 二、stm32的RTC2.1 RTC和后备寄存器2.2 stm32 RTC结构框图及特性 三、stm32 RTC编程2.1 RTC初始化2.2 RTC控制程序 一、RTC基础知识 1.1 RTC简介 实时时钟的缩写是RTC(Real_Time Clock)。RTC 是集成电路,通常称…...

简历中的期望薪资怎么定?
在简历中撰写期望薪资时,既要体现你的价值认知,又要保持一定的灵活性和开放性,以便在后续的面试和薪资谈判中留有余地。以下是一些撰写期望薪资的合理方法: 一、明确薪资范围 1.市场调研: 在撰写期望薪资前…...

MySQL 中的 GROUP BY 使用
MySQL 中的 GROUP BY 使用指南 GROUP BY 是 SQL 中一个非常强大的语句,用于将查询结果按指定的列进行分组,并对每个分组执行聚合函数。它常常与聚合函数(如 COUNT、SUM、AVG、MIN 和 MAX)结合使用,以生成汇总信息。 …...

在 ubantu 20.04 云服务器上基于 bochs 编译 linux0.11
安装 bochs 将下面的命令全部执行一遍: sudo apt-get install build-essential sudo apt-get install xorg-dev sudo apt-get install bison sudo apt-get install g 我们区官网下载一下bochs的源码:bochs下载 这里我下载好了bochs2.6.8 这个版本的…...

docker-compose安装部署和使用
docker-compose 是用于定义和运行多容器 Docker 应用程序的工具。通过 Compose,您可以使用 YML 文件来配置应用程序需要的所有服务。然后,使用一个命令,就可以从 YML 文件配置中创建并启动所有服务 1.docker-compose安装 github上下载二进制文…...

Java之静态
静态: 使用 static 关键字声明的成分属于类本身,而不是类的任何特定对象的实例。这意味着你可以在创建类的任何对象之前访问它们。 静态变量: 静态变量(也称为类变量)是被类的所有实例共享的变量。无论你创建多少对象…...

PCB缺陷检测数据集 xml 可转yolo格式 ,共10688张图片
PCB缺陷检测数据集(yolov5,v7,v8) 数据集总共有两个文件夹,一个是pcb整体标注,一个是pcb部分截图。 整体标注有6个分类,开路,短路等都已经标注,标注格式为xml,每个文件夹下有100多张…...

【linux开发-驱动】-设备树
一、什么是设备树 描述设备树的文件叫做DTS(Device Tree Source),采用树形结构描述板级设备,也就是开发板上的设备信息,比如IIC接口上接了那些设备,内存基地址等 树的主干就是系统总线,枝干就…...

不动产证ocr识别场景解析、房产证识别API
不动产证OCR识别、房产证识别接口是通过光学字符识别技术(OCR)从不动产证书的图像或扫描件中自动提取关键信息的技术应用。该场景的主要目标是提高信息录入的效率,减少人工输入的错误,并能自动化处理大量不动产证书、房产证的数据…...

gpg 密钥生成、导入、导出、自动输入密码
目录 一、系统环境 二、常用命令(以签名密钥为例) (1)生成密钥 (2)列出私钥 (3)列出公钥 (4)导出公钥 (5)导出私钥 ÿ…...

新个性化时尚解决方案!Prompt2Fashion:自动生成多风格、类型时尚图像数据集。
今天给大家介绍一种自动化生成时尚图像数据的方法Prompt2Fashion。 首先创建了一组描述,比如“适合婚礼的休闲风格服装”,然后用这些描述来指导计算机生成图像。具体来说,他们使用了大型语言模型来写出这些服装的描述,接着将这些描…...

软件设计师——计算机网络
📔个人主页📚:秋邱-CSDN博客☀️专属专栏✨:软考——软件设计师🏅往期回顾🏆:软件设计师——操作系统🌟其他专栏🌟:C语言_秋邱 一、OSI/ RM七层模型(⭐⭐⭐)…...