当前位置: 首页 > news >正文

Redis 篇-深入了解基于 Redis 实现消息队列(比较基于 List 实现消息队列、基于 PubSub 发布订阅模型之间的区别)

🔥博客主页: 【小扳_-CSDN博客】
❤感谢大家点赞👍收藏⭐评论✍

文章目录

        1.0 消息队列的认识

        2.0 基于 List 实现消息队列

        2.1 基于 List 实现消息队列的优缺点

        3.0 基于 PubSub 实现消息队列

        3.1 基于 PubSub 的消息队列优缺点

        4.0 基于 Stream 实现消息队列

        4.1 Stream 的单消费模式

        4.2 Stream 的消费组模式


        1.0 消息队列的认识

        消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包含 3 个角色:

        1)消息队列:存储和管理消息,也被称为消息代理(Message Broker)

        2)生产者:发送消息到消息队列。

        3)消费者:从消息队列获取消息并处理消息。

        2.0 基于 List 实现消息队列

        Redis 的 list 数据结构是一个双向链表,很容易模拟出队列效果。

实现思路:

        队列时入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP 或者 RPUSH 结合 LPOP 来实现。不过需要注意的是,当队列中没有消息时 RPOP 或 LPOP 操作会直接返回 null ,并不像 JVM 的阻塞队列那样会阻塞并等待消息,因此这里应该使用 BRPOP 或者 BLPOP 来实现阻塞效果。

代码演示:

        当数据要进入队列时,那么可以使用 LPUSH KEY VALUE 命令,KEY 为队列名称,VALUE 为数据值,将数据写入 Redis 中。当要获取数据的时,使用 BRPOP KEY TIMEOUT命令,KEY 为队列名称,TIMEOUT 为最大阻塞时间,在最大阻塞时间内,仍旧没有获取数据,则返回 null 。该命令主要做了两步,将数据移除队列中,并将该数据返回。

        2.1 基于 List 实现消息队列的优缺点

        优点:

        1)利用 Redis 存储,不受限于 JVM 内存上限。

        2)基于 Redis 的持久化机制,数据安全性有保证。

        3)可以满足消息有序性。

        缺点:

        1)无法避免消息丢失。

        当在 BRPOP 获取数据的时候,出现异常,返回数据失败,从而导致数据丢失。因为数据已经从队列中移除出来了,所以队列中已经不存在之前的数据了。

        2)只支持单消费者。

        当一个消费者来消费之后,其他再来的消费者就不能再获取到第一个消费者的数据,所以说数据只能给一个消费者。

        3.0 基于 PubSub 实现消息队列

        PubSub 是 Redis2.0 版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个 channel ,生产者向对应 channel 发送消息后,所有订阅者都能收到相关消息。

常用的命令:

        1)SUBSCRIBE channel [channel]:订阅一个或者多个频道。

        2)PUBLISH channel msg:向一个频道发送消息。

        3)PSUBSCRIBE pattern [pattern]:订阅与 pattern 格式匹配的所有频道。* 代表通配符,订阅所有频道。

        这就实现了支持多个消费者获取到相同的消息。当消息被发布了,那么已经订阅该频道的消费者就可以及时获取到消息了。

代码演示:

        先订阅频道:

        发送消息:

        当生产者发送完消息,消费者就会收到通知,从通道中获取到消息。

        3.1 基于 PubSub 的消息队列优缺点

        优点:

        1)采用发布订阅模型,支持多生产、多消费。

        解决了基于 List 实现的消息队列的缺点,单消费。

        缺点:

        1)不支持数据持久化。

        将消息发布出去之后,不会进行数据保存。不管有无消费者订阅,都会将消息直接发布出去。

        2)无法避免消息丢失。

        因为不支持持久化,当消息丢失之后,无法再找到原本的数据。

        3)消息堆积有上限,超出时数据丢失。 

        在消费者中,接收到的数据会暂时存放起来,一旦超过存放的大小,就会导致数据丢失。

        4.0 基于 Stream 实现消息队列

        Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

发送消息的命令:

XADD key ID field string [field string ...]

         key 为队列名称,*|ID 为消息的唯一 id,* 代表由 Redis 自动生成。格式是“时间戳-递增数字”,例如 "1644804662707-0"。field value 代表发送到队列中的消息,称为  Entry 。格式就是多个 key-value 键值对。

代码演示:

        4.1 Stream 的单消费模式

单消费者获取数据的命名:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

        COUNT count 为每次读取消息的最大数量;BLOCK milliseconds 代表当没有消息时,是否阻塞,阻塞时长;STREAMS key 代表要从哪个队列读取消息,key 就是队列名;ID 代表起始 id ,只返回大于该 ID 的消息,0 为从第一个消息开始,而 $ 为从最新的消息开始。

代码演示:

        当 ID 使用 $ 时,不会从原本 s 中直接获取原本的数据,而是在 2 秒内有无最新的数据添加进来,如果有,则返回该数据;如果没有,则返回 null。

        当 ID 使用 0 时,则从原本 s 中直接获取原本的数据。

        在业务开发中,我们可以循环的调用 XREAD 阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:

        需要注意的地方:

        当使用 Stream 单消费者模式的时候,我们指定起始 ID 为 $ 时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过 1 条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。

        XREAD 命令特点:

        1)消息可回溯。

        2)一个消息可以被多个消费者读取。

        3)可以阻塞读取。

        4)有消息漏读的风险。

        4.2 Stream 的消费组模式

        将多个消费者划分到一个组中,监听同一个队列。

特点:

        1)消息分流:

        队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度。

        2)消息标示:

        消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费。

        3)消息确认:

        消费者获取消息后,消息处于 pending 状态,并存入一个 pending-list 。当处理完成后,需要通过 XACK 来确认消息,标记消息为已处理,才会从 pending-list 移除。

创建消费者组:

XGROUP CREATE key groupname id|$ [MKSTREAM]

        key 代表队列名称,groupName 代表消费者组名称,ID 起始 ID 标示,$ 代表队列中最后一个消息,0 则代表队列中第一个消息。MKSTREAMS 代表不存在时自动创建队列。

        如果之前列表的数据要继续获取,则 ID 选为 0;如果之前的列表中的数据不需要了,则 ID 选为 $ 。

从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

        group 代表组名,consumer 代表组内消费者名称,count 代表每次读取的最大数量,milliseconds 代表当没有消息时最长的等待时间,NOACK 代表无需手动 ACK,获取消息后自动确认。key 代表指定队列名称,

        ID 代表获取消息的起始 ID :

                当 ID 为 ">" :从下一个未消费的消息开始。

                当 ID 为其他:根据指定 id 从 pending-list 中获取已消费但未确认的消息,例如 0,是从 pending-list 中的第一个消息开始。

确认消息:

XACK key groupName ID

        key 为队列名,groupName 为组名,ID 为消息唯一 id 。

查看未确认的消息:

XPENDING key group [start end count] [consumer]

        key 为队列名,group 为组名,start 起始地址,count 个数,consumer 组内消费者名称。

消费者监听消息思路:

 

Java 代码实现从消息队列中获取消息:

import cn.hutool.core.bean.BeanUtil;
import com.project.volunteermanagementproject.pojo.StreamObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.time.Duration;
import java.util.List;
import java.util.Map;
@Component
public class StreamUtil {@AutowiredStringRedisTemplate stringRedisTemplate;//实现从消息队列中获取消息public void getStream(){while (true){try {List<MapRecord<String, Object, Object>> read = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("s1", ReadOffset.lastConsumed()));if (read == null || read.isEmpty()){//如果获取失败,说明没有消息,继续下一次循环continue;}//解析消息中的消息MapRecord<String, Object, Object> entries = read.get(0);Map<Object, Object> value = entries.getValue();StreamObject streamObject = BeanUtil.fillBeanWithMap(value, new StreamObject(), true);//这就拿到了消息队列中的数据了,就可以去使用该对象了System.out.println(streamObject);//这就需要确认消息队列stringRedisTemplate.opsForStream().acknowledge("s1", "g1", entries.getId());} catch (Exception e) {//如果在获取消息过程中出现异常,则需要再次执行该消息任务while (true){try {List<MapRecord<String, Object, Object>> read = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create("s1", ReadOffset.from("0")));if (read == null || read.isEmpty()){break;}MapRecord<String, Object, Object> entries = read.get(0);Map<Object, Object> value = entries.getValue();StreamObject streamObject = BeanUtil.fillBeanWithMap(value, new StreamObject(), true);//重新拿到未确认的数据System.out.println(streamObject);//再次进行消息确认Long acknowledge = stringRedisTemplate.opsForStream().acknowledge("s1", "g1", entries.getId());} catch (Exception ex) {throw new RuntimeException(ex);}}}}}
}

XREADGROUP 命令特点:

        1)消息可回溯

        2)可以多消费者争抢消息,加快消费速度

        3)可以阻塞读取

        4)没有消息漏读的风险

        5)有消息确认机制,保证消息至少被消费一次

相关文章:

Redis 篇-深入了解基于 Redis 实现消息队列(比较基于 List 实现消息队列、基于 PubSub 发布订阅模型之间的区别)

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 消息队列的认识 2.0 基于 List 实现消息队列 2.1 基于 List 实现消息队列的优缺点 3.0 基于 PubSub 实现消息队列 3.1 基于 PubSub 的消息队列优缺点 4.0 基于 St…...

python 学习一张图

python学习一张图&#xff0c;python的特点的是学的快&#xff0c;一段时间不用&#xff0c;忘记的也快&#xff0c;弄一张图及一些入门案例吧。 写一个简单的测试&#xff1a; #!/usr/bin/python # -*- coding: UTF-8 -*- import osdef add_num(a, b):return a bif __name__…...

通过Docker部署 MongoDB 服务器

今天我们将在三丰云的免费服务器上进行 MongoDB 的部署测试。这款不错的免费服务器提供了很好的性能&#xff0c;1核CPU、1G内存、10G硬盘和5M带宽&#xff0c;足以满足我们的基本需求。三丰云的服务稳定&#xff0c;操作简单&#xff0c;真是一个值得推荐的选择&#xff0c;特…...

无人机避障雷达技术详解

随着无人机技术的飞速发展&#xff0c;其应用领域已经从最初的军事领域扩展到商业、农业、建筑巡检、应急救援、物流运输等多个领域。在这些多样化的应用场景中&#xff0c;无人机的安全性和稳定性显得尤为重要。无人机避障技术作为保障无人机安全飞行的核心技术之一&#xff0…...

2009-2023年上市公司华证esg评级评分数据(年度+季度)(含细分项)

2009-2023年上市公司华证esg评级评分数据&#xff08;年度季度&#xff09;&#xff08;含细分项&#xff09; 1、时间&#xff1a;2009-2023年 2、来源&#xff1a;整理自wind 3、指标&#xff1a;证券代码、年份、证券简称、评级日期、综合评级、综合得分、E评级、E得分、…...

C++ 模板进阶知识——stdenable_if

目录 C 模板进阶知识——std::enable_if1. 简介和背景基本语法使用场景 2. std::enable_if 的基本用法示例&#xff1a;限制函数模板只接受整数类型 3. SFINAE 和 std::enable_if示例&#xff1a;使用 SFINAE 进行函数重载SFINAE 的优点应用场景 4. 在类模板中使用 std::enable…...

国内外ChatGPT网站集合,无限制使用【2024-09最新】~

经过我一年多以来&#xff0c;使用各种AI工具的体验&#xff0c;我收集了一批AI工具和站点 这些工具都是使用的最强最主流的模型&#xff0c;也都在各个领域里都独领风骚的产品。 而且&#xff0c;这些工具你都可以无限制地使用。 无论你是打工人、科研工作者、学生、文案写…...

如何在VUE3中使用函数式组件

在Vue 3中&#xff0c;函数式组件的概念与Vue 2相似&#xff0c;但实现方式有所不同。函数式组件是一种无状态、无实例的组件&#xff0c;它们只根据传入的props和context来渲染输出。在Vue 3中&#xff0c;你可以通过定义一个函数并返回一个渲染函数来使用函数式组件。但是&am…...

linux访问外网的设置

Ubuntu | LUCKFOX WIKI 开发板配置​ 添加路由信息 sudo route add default gw 172.32.0.100添加 DNS servers 打开文件 sudo vi /etc/resolv.conf添加以下内容: nameserver 8.8.8.8联网测试 ping www.baidu.com开机自动配置 路由信息和 DNS servers 重启后会被清除,我们创建…...

PHP轻松创建高效收集问卷调查小程序系统源码

轻松创建&#xff0c;高效收集 —— 问卷调查小程序&#xff0c;你的调研神器&#xff01; 一、告别繁琐&#xff0c;一键开启调研之旅 还在为设计问卷、收集数据而头疼不已吗&#xff1f;现在&#xff0c;有了“问卷调查小程序”&#xff0c;一切都变得轻松简单&#xff01;无…...

Redis面试必问:Redis为什么快?Redis五大基本数据类型

请记住胡广一句话&#xff0c;所有的中间件所有的框架都是建立在基础之上&#xff0c;数据结构&#xff0c;计算机网络&#xff0c;计算机原理大伙一定得看透&#xff01;&#xff01;~ 1. Redis快的秘密 相信大部分Redis初学者都会忽略掉一个重要的知识点&#xff0c;Redis…...

InternVL2- dockerfile环境变量持久化使用`ENV`而不是`RUN export`来设置环境变量,以确保环境变量在容器运行时仍然可用

在Dockerfile中使用RUN export命令设置环境变量并不是一种持久化的方式。当你在Dockerfile中使用export命令时&#xff0c;它只会在当前构建阶段生效&#xff0c;并不会被持久化到生成的镜像中。这是因为export命令实际上是在shell环境中设置环境变量&#xff0c;而Docker构建过…...

Python(PyTorch和TensorFlow)图像分割卷积网络导图(生物医学)

&#x1f3af;要点 语义分割图像三层分割椭圆图像脑肿瘤图像分割动物图像分割皮肤病变分割多模态医学图像多尺度特征生物医学肖像多类和医学分割通用图像分割模板腹部胰腺图像分割分类注意力网络病灶边界分割气胸图像分割 Python生物医学图像卷积网络 该网络由收缩路径和扩…...

DevOps实现CI/CD实战(七)- Jenkins集成k8s实现自动化CI

自动化CI操作 1. 安装gitlab plugin 工具 ##### 2. 配置流水线任务的构建触发器&#xff0c;复制URL&#xff1a;http://192.168.201.111:8080/project/pipeline 3. Gitlab配置Webhooks&#xff0c;将上面的url&#xff1a;http://192.168.201.111:8080/project/pipeline粘…...

从ES6到ES2023 带你深入了解什么是ES

从ES6到ES2023&#xff0c;我们深入探索ECMAScript&#xff08;简称ES&#xff09;的演变与发展&#xff0c;了解这一JavaScript标准背后的技术革新和进步。ECMAScript作为JavaScript的标准化版本&#xff0c;每年都在不断推出新版本&#xff0c;为开发者带来更加丰富和强大的功…...

openVX加速-常见问题:适用场景、AI加速、安装方式等

1. 哪些算法处理推荐使用 OpenVX OpenVX 是非常适合图像处理和计算机视觉任务的框架&#xff0c;特别是在需要高性能和硬件加速的场景下。如果你的前处理和后处理涉及到图像滤波、边缘检测、颜色转换等操作&#xff0c;使用 OpenVX 可以带来性能提升。 OpenVX 更适合处理以下…...

国产芯片LT8711HE:TYPE-C/DP1.2转HDMI2.0转换器,4k60Hz高分辨率

以下为LT8711HE芯片的简单介绍&#xff0c;如有介绍不尽之处&#xff0c;请指出 LT8711HE是一个高性能的Type-C/DP1.2到HDMI2.0转换器&#xff0c;用于连接USB Type-C源或DP1.2源到HDMI2.0接收器。 LT8711HE集成了一个DP1.2兼容的接收器和一个HDMI2.0兼容的发射器。另外&…...

论文翻译:arxiv-2024 Benchmark Data Contamination of Large Language Models: A Survey

Benchmark Data Contamination of Large Language Models: A Survey https://arxiv.org/abs/2406.04244 大规模语言模型的基准数据污染&#xff1a;一项综述 文章目录 大规模语言模型的基准数据污染&#xff1a;一项综述摘要1 引言 摘要 大规模语言模型&#xff08;LLMs&…...

Java+Swing用户信息管理系统

JavaSwing用户信息管理系统 一、系统介绍二、功能展示1.管理员登陆2.用户信息查询3.用户信息添加4.用户信息修改5.用户信息删除 三、系统实现1.UserDao .java 四、其它1.其他系统实现 一、系统介绍 该系统实现了管理员系统登陆、用户信息查询、用户信息添加、用户信息修改、用…...

数据结构基础详解(C语言): 栈的括号匹配(实战)与栈的表达式求值特殊矩阵的压缩存储

文章目录 栈的应用1.栈的括号匹配代码实战:问题分析:2.栈的表达式求值2.1 中缀、后缀、前缀表达式2.2 中缀表达式改写为后缀表达式(手算)2.3 后缀表达式的计算(手算)2.4 中缀表达式转前缀表达式&#xff08;手算)和计算前缀表达式2.5后缀表达式的计算(机算)2.6 中缀表达式转后缀…...

原创:黄大年茶思屋难题揭榜第141期|5道核心题精简公开·未获技术反馈求指正

黄大年茶思屋难题揭榜第141期&#xff5c;5道核心题精简公开未获技术反馈求指正 作者&#xff1a;华夏之光永存 摘要 这五道题我们已完整解题并提交黄大年茶思屋难题揭榜&#xff0c;最终被退回&#xff0c;但平台未给出任何具体技术驳回意见、未指明缺陷、未提供修改方向。我们…...

Pixel Dream Workshop一文详解:基于diffusers的FluxPipeline定制部署

Pixel Dream Workshop一文详解&#xff1a;基于diffusers的FluxPipeline定制部署 1. 像素幻梦创意工坊概述 Pixel Dream Workshop&#xff08;像素幻梦创意工坊&#xff09;是一款专为像素艺术创作设计的AI生成工具&#xff0c;基于最新的FLUX.1-dev扩散模型构建。与传统AI绘…...

OpenClaw+Qwen3-32B内容创作流:从提纲到公众号发布的自动化

OpenClawQwen3-32B内容创作流&#xff1a;从提纲到公众号发布的自动化 1. 为什么需要自动化内容创作 作为一个技术博主&#xff0c;我每周至少要产出2-3篇深度文章。最痛苦的时刻不是写作本身&#xff0c;而是面对空白文档时的"冷启动"阶段——从选题构思到完成初稿…...

动态数据源配置加密终极指南:如何选择最安全的填充模式保护敏感数据 [特殊字符]️

动态数据源配置加密终极指南&#xff1a;如何选择最安全的填充模式保护敏感数据 &#x1f6e1;️ 【免费下载链接】dynamic-datasource dynamic datasource for springboot 多数据源 动态数据源 主从分离 读写分离 分布式事务 项目地址: https://gitcode.com/gh_mirrors/dy/…...

Pixel Mind Decoder 效果对比评测:在不同文体和语言风格下的表现

Pixel Mind Decoder 效果对比评测&#xff1a;在不同文体和语言风格下的表现 1. 核心能力概览 Pixel Mind Decoder 是一款专注于文本情绪解码的模型&#xff0c;能够识别和分析不同文本中蕴含的情感倾向。与通用情感分析工具不同&#xff0c;它特别擅长处理复杂语境下的微妙情…...

喜马拉雅音频下载工具:技术实现与高效使用指南

喜马拉雅音频下载工具&#xff1a;技术实现与高效使用指南 【免费下载链接】xmly-downloader-qt5 喜马拉雅FM专辑下载器. 支持VIP与付费专辑. 使用GoQt5编写(Not Qt Binding). 项目地址: https://gitcode.com/gh_mirrors/xm/xmly-downloader-qt5 在数字化学习与娱乐场景…...

CST中利用SPICE语言自定义复杂lumped element电路的实战指南

1. 突破CST自带元件的限制&#xff1a;为什么需要SPICE语言 刚开始用CST做电路仿真时&#xff0c;我也觉得自带的RLC元件够用了——直到遇到一个带滤波功能的耦合器项目。当时需要模拟一个包含寄生参数的复杂匹配网络&#xff0c;自带的并联RLC元件死活调不出理想的频响曲线。这…...

CayenneMQTT库详解:嵌入式设备快速接入MQTT平台

1. CayenneMQTT 库概述 CayenneMQTT 是一个专为物联网设备设计的轻量级 MQTT 客户端库&#xff0c;核心目标是将嵌入式终端&#xff08;如 Arduino、ESP8266、ESP32&#xff09;快速、可靠地接入 Cayenne IoT 平台 的可视化仪表盘。该库并非从零实现 MQTT 协议栈&#xff0c…...

AI写的论文如何降到20%以内?分场景教程+工具对比

AI写的论文如何降到20%以内&#xff1f;分场景教程工具对比 “我用DeepSeek写了大半篇论文&#xff0c;导师要求知网AI率必须低于20%&#xff0c;现在已经是52%&#xff0c;我该怎么办&#xff1f;” 这是毕业季最典型的求助问题之一。 不同的情况&#xff0c;处理方法不一样。…...

DanKoe 视频笔记:阅读:改变你生活的简单习惯:概述与引言

https://github.com/OpenDocCN/wealth-notes-zh/raw/master/docs/dankoe/img22971bb5176092c90f7464d7a7aa6e45.png 在本节课中&#xff0c;我们将学习如何通过培养阅读习惯来深刻地改变你的生活。我们将探讨阅读的重要性、如何选择书籍、如何有效阅读&#xff0c;以及如何将阅…...