当前位置: 首页 > news >正文

Redis 实现消息队列

Redis 实现消息队列

文章目录

  • Redis 实现消息队列
    • 导引
    • 1. 基于List结构的消息队列
    • 2. 基于PubSub的消息队列
    • 3. 基于Stream的消息队列(推荐)
      • 3.1 XADD
      • 3.2 XREAD
      • 3.3 XGROUP

导引

消息队列(Message Queue),从概念上来理解就是用来存放消息的队列,最简单的消息队列模型包括以下三个角色:

  • 生产者:发送消息到消息队列
  • 消息队列:存储和管理信息,也被称为消息代理(Message Broker)
  • 消费者:从消息队列中获取消息并处理消息

Redis也为我们提供了三种不同的方式来实现消息队列:

  1. List结构:基于List结构模拟消息队列
  2. PubSub:基本的点对点消息模型
  3. Stream:比较完善的消息队列模型(推荐

1. 基于List结构的消息队列

这种方式比较简单,因为Redis的list数据结构是一个双向链表,很容易模拟出队列的效果。

队列是入口和出口不在一边,对此我们可以利用:LPUSH 结合 BRPOP,或者 RPUSH 结合 BLPOP 来实现先进先出的效果

在这里插入图片描述

:这里使用BRPOP而不是RPOP是因为BRPOP能够实现阻塞的效果而RPOP不能

使用该方式实现消息队列的优缺点如下:

优点

  • 利用Redis存储,不受限与JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 能够满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

2. 基于PubSub的消息队列

PubSub(发布订阅),是Redis2.0版本引入的消息传递模型,消费者可以订阅一个或多个channel(频道),生产者向对应channel发送消息后,所有订阅者都能收到相关消息

它有以下命令:

  • SUBSCRIBE channel [channel]:订阅一个或多个频道

    在这里插入图片描述

  • PUBLISH channel msg:向一个频道发送消息

    在这里插入图片描述

  • PSUBSCRIBE pattern[pattern]:订阅与pattern格式匹配的所有频道

    在这里插入图片描述

具体操作如下所示:

在这里插入图片描述

该方式实现的消息队列支持多消费者的使用,但也存在着以下弊端:

  • 不能支持数据持久化,一旦redis宕机数据就会丢失
  • 无法避免消息丢失
  • 消息堆积有上限,超出上限后数据会丢失

3. 基于Stream的消息队列(推荐)

Stream是Redis5.0引入的一种新数据类型,能够实现功能完善的消息队列,因为它本身就是一个消息队列,所以我们可以直接通过命令来使用它:

3.1 XADD

作用:发送消息

在这里插入图片描述

其中:

  • key:队列名称

  • [NOMKSTREAM]:如果队列不存在,是否自动创建队列,默认是自动创建

  • [MAXLEN|MINID [=|~] threshold [LIMIT count]]:设置消息队列的最大消息数量

  • *|ID:消息的唯一id,表示由Redis自动生成,格式是“时间戳-递增数字”,一般推荐使用来自动生成

  • field value [field value …]:发送到队列中的消息,以键值对的格式录入,可以多个同时录入

举个栗子🌰:

创建一个名为 users 的队列,并向其中发送一个消息,内容是:{name=Json, age=25},并使用Redis自动生成ID

在这里插入图片描述

3.2 XREAD

作用:读取消息

在这里插入图片描述

其中:

  • [COUNT count]:指定每次读取消息的最大数量
  • [BLOCK milliseconds]:当队列中没有消息时,阻塞指定时长,单位为秒
  • STREAMS key [key …]:要从哪个队列中读取消息,key就是队列名,可以指定多个队列
  • ID [ID …]起始ID,只返回大于该ID的消息,其中0代表从第一个消息开始,$代表从最新的消息开始

举个栗子🌰:

读取users队列中的第一个消息

在这里插入图片描述

:在上述测试中我们只往users队列中添加了一个消息,这个时候如果ID使用$来获取最新消息,且设置了阻塞等待的话,此时读取信息将在阻塞时间过后返回空:

在这里插入图片描述

3.3 XGROUP

消费者组,一个消费者组中可以有多个消费者来操作同一个消息队列

在这里插入图片描述

通常由以下命令组成:

  • 创建消费者组:

    XGROUP Create key groupName ID [MKSTREAM]
    

    其中:

    • key:队列名称
    • groupName:消费者组名称
    • ID:起始ID标识,0代表队列中第一个消息,$代表队列最后一个消息
    • MKSTREAM:队列不存在时自动创建队列

    在这里插入图片描述

    :这里要求队列key已经存在才能创建消费者组,否则需要开启MKSTREAM让其自动创建新的队列

    在这里插入图片描述

  • 删除指定的消费者组:

    XGROUP Destroy key groupName
    

    在这里插入图片描述

  • 给指定消费者组添加消费者

    XGROUP CREATECONSUMER key groupName consumerName
    

    在这里插入图片描述

  • 删除指定消费者组中的消费者

    XGROUP DELCONSUMER key groupName consumerName
    

    在这里插入图片描述

  • 从消费者组中读取消息

    XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
    

    在这里插入图片描述

    其中:

    • group:消费者组名称
    • consumer:消费者名称,如果消费者不存在,会根据该名称自动创建一个消费者
    • count:本次查询的最大数量
    • BLOCK milliseconds:阻塞时间,没有消息时会进行等待,以毫秒为单位
    • NOACK:选择后无需手动ACK,获取到消息后自动确认,一般不建议设置,当我们获取完消息后需要手动确认ack
    • STREAMS key:指定队列名称
    • ID:获取消息的起始ID,它有以下情况:
      • >”:表示从下一个未消费的消息开始
      • 其它:根据指定id从pending-list中获取消息,pending-list用于专门存放那些已消费但未确认的消息;例如此时ID为0,表示获取pending-list中的第一个消息.

    同一个消费者组中的消费者读取同一个消息队列时,若ID使用>来读取,则下一个读取的消息一定是前面的消费者没有读取到的消息,直到消息队列中的消息都被读取过后,最后一个读取的消费者返回nil

    举个栗子🌰:

    我们创建一个队列叫list,再添加几条消息

    在这里插入图片描述

    在这里插入图片描述

    创建一个消费者组g1监听list消息队列

    在这里插入图片描述

    通过XREADGROUP命令为消费者组g1添加消费者c1、c2、c3来读取list队列消息

    在这里插入图片描述

可以看到,同一个消费者组中的消费者,它们都在获取同一个队列中的消息,且ID使用>来读取,下一个读取的消息一定是前面的消费者没有读取到的消息,直到消息全部被读完后只返回nil!

每读取完一条消息,我们需要对它进行手动确认,使其从pending-list中移除,使用下述命令可以查看已读取但还未确认的消息:

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

在这里插入图片描述

我们查看一下list中还有多少未确认的消息:

在这里插入图片描述

好的都没被确认,所以需要我们手动去确认消息,使其从pending-list中移除,操作命令如下:

XACK key group ID [ID ...]

在这里插入图片描述

上述的ID为添加消息时自动创建并返回的ID:

在这里插入图片描述

这样所有已读取的消息就会从pending-list中移除了!

这里贴上该消息队列在Java中的实现方式

//获取消息队列中的信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000ms STEAMS list >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("list", ReadOffset.lastConsumed())
);//ACK确认 SACK list g1 id
// 注:因为这里我们只从消息队列中获取一条信息(COUNT 1),所以list.get()使用索引0即可
stringRedisTemplate.opsForStream().acknowledge("list", "g1", list.get(0).getId()); //获取pending_list中的消息 XREADGROUP GROUP g1 c1 COUNT 1 STEAMS list 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create("list", ReadOffset.from("0"))
);
// 上述代码可以配合循环实现被消费者组不断监听的消息队列

以上便是对Redis实现消息队列的介绍了!!如果内容对大家有帮助的话请给这篇文章一个三连关注吧💕( •̀ ω •́ )✧✨

相关文章:

Redis 实现消息队列

Redis 实现消息队列 文章目录 Redis 实现消息队列导引1. 基于List结构的消息队列2. 基于PubSub的消息队列3. 基于Stream的消息队列(推荐)3.1 XADD3.2 XREAD3.3 XGROUP 导引 消息队列(Message Queue)&#xff0c;从概念上来理解就是用来存放消息的队列&#xff0c;最简单的消息…...

模板初阶(详解)

一、泛型编程 为了引出模板&#xff0c;我们来看下面代码&#xff0c;比如要实现不同类型的交换函数&#xff0c;如下&#xff1a; void Swap(int& a, int& b) {int c a;a b;b c; } void Swap(char& a, char& b) {char c a;a b;b c; } void Swap(doubl…...

对称加密算法解析:DES、AES及其在`pycryptodome` 和 `crypto-js` 模块中的应用

&#x1f4da; 对称加密算法解析&#xff1a;DES、AES及其在pycryptodome 和 crypto-js 模块中的应用 &#x1f5dd;️ DES 算法 算法原理 数据加密标准&#xff08;DES&#xff09;是一种对称密钥加密算法&#xff0c;用于保护数据的安全。DES 使用一个 56 位的密钥进行加密…...

C++设计模式(代理模式)

1. 电话虫 在海贼中&#xff0c;有一种神奇的通信工具叫做电话虫&#xff08;Den Den Mushi&#xff09;&#xff0c;外形如蜗牛&#xff0c;身上带有斑点或条纹或通体纯色&#xff0c;壳顶上有对讲机或按键&#xff0c;不接通时会睡觉&#xff0c;接通时会惊醒&#xff0c;并发…...

Linux系统驱动(十三)Linux内核定时器

文章目录 一、内核定时器原理二、定时器API三、使用定时器让LED灯闪烁四、使用定时器对按键进行消抖 一、内核定时器原理 内核当前时间通过jiffies获取&#xff0c;它是内核时钟节拍数&#xff0c;在linux内核启动的时候&#xff0c;jiffies开始&#xff08;按照一定频率&…...

Visual Studio 调试时加载符号慢

什么是调试符号 编译程序时生成的一组特殊字符&#xff0c;并包含有关变量和函数在生成的二进制文件中的位置以及其他服务信息的信息。 该数据集可用于逐步调试程序或检查第三方代码。 调试符号可以添加到可执行文件或库中&#xff0c;但是大多数现代编译器将它们存储为单独的…...

Spring Cloud Config:动态配置的魔法师

Spring Cloud Config&#xff1a;动态配置的魔法师 在微服务架构的浩瀚星海中&#xff0c;配置管理如同一颗璀璨的星辰&#xff0c;而Spring Cloud Config则是那颗能够实现配置信息集中管理和动态刷新的魔法星。本文将深入探索Spring Cloud Config的奥秘&#xff0c;揭示如何通…...

Webpack入门基础知识及案例

webpack相信大家都已经不陌生了&#xff0c;应用程序的静态模块打包工具。前面我们总结了vue&#xff0c;react入门基础知识&#xff0c;也分别做了vue3的实战小案例&#xff0c;react的实战案例&#xff0c;那么我们如何使用webpack对项目进行模块化打包呢&#xff1f; 话不多…...

对中国人工智能与国外人工智能的思考

作为一名语文老师&#xff0c;我在教育的领域中见证着时代的变迁&#xff0c;也关注着科技的发展&#xff0c;尤其是人工智能这一前沿领域。当我们将目光投向中国人工智能与国外人工智能的发展时&#xff0c;心中不禁涌起诸多思考。 中国的人工智能近年来犹如一颗璀璨的新星&am…...

【debian系统arm架构安装docker】且换源后依旧不行就离线导入镜像

安装docker 在Debian系统上安装Docker并使用阿里云的镜像源可以通过以下步骤完成 1.更新软件包索引 前置如果需要更换源的请移步 : 初始化配置(自动连wifi,自动开启SSH)换清华源,远程桌面连接 sudo apt-get update2.安装必要的软件包以允许apt通过HTTPS使用仓库 sudo apt-get …...

Readwise 官方 Obsidian 插件使用

Readwise 官方 Obsidian 插件简介 Obsidian 中的 Readwise 注释示例 阅读已经发布了官方插件来导入你的 Readwise 数据Obsidian 。如果你还没有使用过 Readwise&#xff0c;那么值得一看。Readwise 是我最喜欢的 应用之一。我每天都在用它。因此&#xff0c;将这些数据自动导…...

A. A+B Again?

time limit per test 1 second memory limit per test 256 megabytes Given a two-digit positive integer nn, find the sum of its digits. Input The first line contains an integer tt (1≤t≤901≤t≤90) — the number of test cases. The only line of each tes…...

pr样机模板视频素材|城市户外高速路广告牌视频样机

https://prmuban.com/40369.html pr样机素材&#xff0c;全高清实景城市户外高速路广告牌视频样机模板&#xff0c;适合宣传视频制作。 主要特点&#xff1a; Adobe Premiere Pro 2024 全高清分辨率&#xff08;19201080&#xff09; 易于使用 快速渲染 无需插件 预览中使用的…...

谷歌大中华区总裁:所有企业都在问这个问题

中国开发者对于出海的热情&#xff0c;令Google大中华区总裁陈俊廷感慨。2024 Google I/O Connect&#xff08;2024Google开发者大会&#xff09;期间&#xff0c;他在接受第一财经记者独家采访时提到一个细节&#xff1a;早上7:30&#xff0c;他来到会场时&#xff0c;人们已经…...

GPT-4o:AI视觉识别的革命性飞跃

在AI的宏伟叙事中&#xff0c;图像识别技术始终扮演着关键角色。随着技术的不断演进&#xff0c;AI的视界已超越了简单的图像内容识别&#xff0c;它现在能够将视觉信息转化为引人入胜的文字描述。OpenAI最新力作——GPT-4o模型&#xff0c;以其卓越的多模态理解能力&#xff0…...

将电脑打造成私人网盘,支持外网访问之详细操作教程

你想过把自己电脑打造成随时随地访问的网盘吗&#xff1f;就是那种拥有一个属于自己的影音库&#xff0c;不用担心被和谐&#xff0c;随时可以登录访问电脑上的各种文件&#xff0c;相比传统网盘省心又安全。 使用Everything和节点小宝将电脑搭建成私人网盘&#xff0c;可以实现…...

spring同一个接口如何分页实现主表或主+联表group by查询

1 需求背景 我们的上游系统想要知道主表的记录关联子表所有记录中是否有一条满足特定的条件&#xff0c;如果有满足的就返回主表中的id。比如品牌brand主表中id为10的记录&#xff0c;在子表brand_rel中有id为1&#xff0c;2&#xff0c;3&#xff0c;4的这四个记录&#xff0…...

SpringDataJpa源码分析

我们在定义Repository的时候通常定义的时一个接口&#xff0c;而并没有去实现这个接口&#xff0c;那么Jpa是如何让开发者无需自己实现接口就可以使用Repository去操作数据库&#xff1f; 动态代理&#xff01;&#xff01;&#xff01; Repository原理 试想一下JPA是如何做的…...

卷积神经网络 - 卷积神经网络与深度学习的历史篇

序言 卷积神经网络&#xff08; Convolutional Neural Networks, CNN \text{Convolutional Neural Networks, CNN} Convolutional Neural Networks, CNN&#xff09;与深度学习作为人工智能领域的两大重要分支&#xff0c;其发展历程充满了探索与突破。深度学习&#xff0c;作…...

初识 Floodfall 算法

文章目录 **一、Floodfall 算法的概述****二、深度优先搜索&#xff08;DFS&#xff09;和广度优先搜索&#xff08;BFS&#xff09;在 Floodfall 算法中的应用****三、算法的基本原理****四、应用场景** 一、Floodfall 算法的概述 Floodfall 算法通常用于解决与区域填充、图的…...

PostgreSQL 17安装后必做的5件事:从安全加固到性能调优(附pg_hba.conf配置详解)

PostgreSQL 17安装后必做的5件事&#xff1a;从安全加固到性能调优 刚完成PostgreSQL 17的安装只是数据库旅程的第一步。要让这个强大的关系型数据库真正发挥生产级效能&#xff0c;还需要一系列精细化的配置。本文将带你完成五个关键步骤&#xff0c;从安全策略到性能参数&…...

proteus新手福音:用快马平台轻松生成第一个电路仿真项目

作为一个刚接触电子电路设计的萌新&#xff0c;第一次打开Proteus时真的被满屏的英文界面和密密麻麻的元件库吓到了。直到发现了InsCode(快马)平台&#xff0c;用自然语言描述就能生成完整的仿真项目&#xff0c;简直是新手救星&#xff01;下面分享我的第一个LED闪烁电路仿真实…...

Kimi-VL-A3B-Thinking开源可部署:提供ONNX导出路径与推理引擎适配

Kimi-VL-A3B-Thinking开源可部署&#xff1a;提供ONNX导出路径与推理引擎适配 1. 模型简介 Kimi-VL-A3B-Thinking是一款高效的开源混合专家&#xff08;MoE&#xff09;视觉语言模型&#xff08;VLM&#xff09;&#xff0c;具备以下核心特点&#xff1a; 参数高效&#xff…...

SQL代码质量守护神:sql-lint实现数据库开发效率革命性突破

SQL代码质量守护神&#xff1a;sql-lint实现数据库开发效率革命性突破 【免费下载链接】sql-lint An SQL linter 项目地址: https://gitcode.com/gh_mirrors/sq/sql-lint 在现代数据库开发流程中&#xff0c;SQL代码的质量直接关系到系统稳定性与数据安全。据行业统计&a…...

企业级翻译系统落地:TranslateGemma助力国际化团队代码协作

企业级翻译系统落地&#xff1a;TranslateGemma助力国际化团队代码协作 1. 引言&#xff1a;全球化开发的语言挑战 在跨国企业技术团队中&#xff0c;代码协作常常面临语言障碍&#xff1a;核心框架文档是英文&#xff0c;而部分团队成员更习惯使用中文&#xff1b;开源项目注…...

为什么说res-downloader能3步搞定全网资源下载?从新手到高手的实战指南

为什么说res-downloader能3步搞定全网资源下载&#xff1f;从新手到高手的实战指南 【免费下载链接】res-downloader 视频号、小程序、抖音、快手、小红书、直播流、m3u8、酷狗、QQ音乐等常见网络资源下载! 项目地址: https://gitcode.com/GitHub_Trending/re/res-downloader…...

终极指南:如何快速为设计添加地图填充效果 - Sketch Map Generator 插件完全解析

终极指南&#xff1a;如何快速为设计添加地图填充效果 - Sketch Map Generator 插件完全解析 【免费下载链接】sketch-map-generator Sketch plugin to fill a shape with a map generated from a given location using Google Maps and Mapbox 项目地址: https://gitcode.co…...

MT5 Zero-Shot中文文本增强企业应用:提升标注效率50%实测报告

MT5 Zero-Shot中文文本增强企业应用&#xff1a;提升标注效率50%实测报告 1. 引言&#xff1a;当数据标注成为AI落地的瓶颈 想象一下这个场景&#xff1a;你的AI团队开发了一个智能客服模型&#xff0c;需要大量高质量的对话数据进行训练。数据工程师们夜以继日地标注数据&am…...

从视频收藏到内容管理:BilibiliDown图形化下载器深度解析

从视频收藏到内容管理&#xff1a;BilibiliDown图形化下载器深度解析 【免费下载链接】BilibiliDown (GUI-多平台支持) B站 哔哩哔哩 视频下载器。支持稍后再看、收藏夹、UP主视频批量下载|Bilibili Video Downloader &#x1f633; 项目地址: https://gitcode.com/gh_mirror…...

别再死磕手册了!手把手教你用TwinCAT 3搞定EtherCAT CIA402从站配置(附状态机避坑点)

TwinCAT 3实战&#xff1a;EtherCAT CIA402从站配置全流程解析与状态机避坑指南 第一次接触EtherCAT CIA402协议栈时&#xff0c;面对ETG6010手册里密密麻麻的对象字典和状态机转换规则&#xff0c;相信不少工程师都有过这样的困惑&#xff1a;为什么我的驱动器始终无法进入Ope…...