redis原理 6:小道消息 —— PubSub
前面我们讲了 Redis 消息队列的使用方法,但是没有提到 Redis 消息队列的不足之处,那就是它不支持消息的多播机制。
消息多播
消息多播允许生产者生产一次消息,中间件负责将消息复制到多个消息队列,每个消息队列由相应的消费组进行消费。它是分布式系统常用的一种解耦方式,用于将多个消费组的逻辑进行拆分。支持了消息多播,多个消费组的逻辑就可以放到不同的子系统中。
如果是普通的消息队列,就得将多个不同的消费组逻辑串接起来放在一个子系统中,进行连续消费。
PubSub
为了支持消息多播,Redis 不能再依赖于那 5 种基本数据类型了。它单独使用了一个模块来支持消息多播,这个模块的名字叫着 PubSub,也就是 PublisherSubscriber,发布者订阅者模型。我们使用 Python 语言来演示一下 PubSub 如何使用。
python复制代码# -*- coding: utf-8 -*-
import time
import redis
client = redis.StrictRedis()
p = client.pubsub()
p.subscribe("codehole")
time.sleep(1)
print p.get_message()
client.publish("codehole", "java comes")
time.sleep(1)
print p.get_message()
client.publish("codehole", "python comes")
time.sleep(1)
print p.get_message()
print p.get_message()
python复制代码{'pattern': None, 'type': 'subscribe', 'channel': 'codehole', 'data': 1L}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'java comes'}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'python comes'}
None
客户端发起订阅命令后,Redis 会立即给予一个反馈消息通知订阅成功。因为有网络传输延迟,在 subscribe 命令发出后,需要休眠一会,再通过 get\_message 才能拿到反馈消息。客户端接下来执行发布命令,发布了一条消息。同样因为网络延迟,在 publish 命令发出后,需要休眠一会,再通过 get\_message 才能拿到发布的消息。如果当前没有消息,get\_message 会返回空,告知当前没有消息,所以它不是阻塞的。
Redis PubSub 的生产者和消费者是不同的连接,也就是上面这个例子实际上使用了两个 Redis 的连接。这是必须的,因为 Redis 不允许连接在 subscribe 等待消息时还要进行其它的操作。
在生产环境中,我们很少将生产者和消费者放在同一个线程里。如果它们真要在同一个线程里,何必通过中间件来流转,直接使用函数调用就行。所以我们应该将生产者和消费者分离,接下来我们看看分离后的代码要怎么写。
消费者
py复制代码# -*- coding: utf-8 -*-
import time
import redis
client = redis.StrictRedis()
p = client.pubsub()
p.subscribe("codehole")
while True:
msg = p.get_message()
if not msg:
time.sleep(1)
continue
print msg
生产者
py复制代码# -*- coding: utf-8 -*-
import redis
client = redis.StrictRedis()
client.publish("codehole", "python comes")
client.publish("codehole", "java comes")
client.publish("codehole", "golang comes")
必须先启动消费者,然后再执行生产者,消费者我们可以启动多个,pubsub 会保证它们收到的是相同的消息序列。
python复制代码{'pattern': None, 'type': 'subscribe', 'channel': 'codehole', 'data': 1L}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'python comes'}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'java comes'}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'golang comes'}
我们从消费者的控制台窗口可以看到上面的输出,每个消费者窗口都是同样的输出。第一行是订阅成功消息,它很快就会输出,后面的三行会在生产者进程执行的时候立即输出。 上面的消费者是通过轮询 get_message 来收取消息的,如果收取不到就休眠 1s。这让我们想起了第 3 节的消息队列模型,我们使用 blpop 来代替休眠来提高消息处理的及时性。
PubSub 的消费者如果使用休眠的方式来轮询消息,也会遭遇消息处理不及时的问题。不过我们可以使用 listen 来阻塞监听消息来进行处理,这点同 blpop 原理是一样的。下面我们改造一下消费者
阻塞消费者
py复制代码# -*- coding: utf-8 -*-
import time
import redis
client = redis.StrictRedis()
p = client.pubsub()
p.subscribe("codehole")
for msg in p.listen():
print msg
代码简短了很多,不需要再休眠了,消息处理也及时了。
模式订阅
上面提到的订阅模式是基于名称订阅的,消费者订阅一个主题是必须明确指定主题的名称。如果我们想要订阅多个主题,那就 subscribe 多个名称。
sh复制代码> subscribe codehole.image codehole.text codehole.blog # 同时订阅三个主题,会有三条订阅成功反馈信息
1) "subscribe"
2) "codehole.image"
3) (integer) 1
1) "subscribe"
2) "codehole.text"
3) (integer) 2
1) "subscribe"
2) "codehole.blog"
3) (integer) 3
这样生产者向这三个主题发布的消息,这个消费者都可以接收到。
sh复制代码> publish codehole.image https://www.google.com/dudo.png
(integer) 1
> publish codehole.text " 你好,欢迎加入码洞 "
(integer) 1
> publish codehole.blog '{"content": "hello, everyone", "title": "welcome"}'
(integer) 1
如果现在要增加一个主题codehole.group,客户端必须也跟着增加一个订阅指令才可以收到新开主题的消息推送。
为了简化订阅的繁琐,redis 提供了模式订阅功能Pattern Subscribe,这样就可以一次订阅多个主题,即使生产者新增加了同模式的主题,消费者也可以立即收到消息
bash复制代码> psubscribe codehole.* # 用模式匹配一次订阅多个主题,主题以 codehole. 字符开头的消息都可以收到
1) "psubscribe"
2) "codehole.*"
3) (integer) 1
消息结构
前面的消费者消息输出时都是下面的这样一个字典形式
python复制代码{'pattern': None, 'type': 'subscribe', 'channel': 'codehole', 'data': 1L}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'python comes'}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'java comes'}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'golang comes'}
那这几个字段是什么含义呢?
data 这个毫无疑问就是消息的内容,一个字符串。
channel 这个也很明显,它表示当前订阅的主题名称。
type 它表示消息的类型,如果是一个普通的消息,那么类型就是 message,如果是控制消息,比如订阅指令的反馈,它的类型就是 subscribe,如果是模式订阅的反馈,它的类型就是 psubscribe,还有取消订阅指令的反馈 unsubscribe 和 punsubscribe。
pattern 它表示当前消息是使用哪种模式订阅到的,如果是通过 subscribe 指令订阅的,那么这个字段就是空。
PubSub 缺点
PubSub 的生产者传递过来一个消息,Redis 会直接找到相应的消费者传递过去。如果一个消费者都没有,那么消息直接丢弃。如果开始有三个消费者,一个消费者突然挂掉了,生产者会继续发送消息,另外两个消费者可以持续收到消息。但是挂掉的消费者重新连上的时候,这断连期间生产者发送的消息,对于这个消费者来说就是彻底丢失了。
如果 Redis 停机重启,PubSub 的消息是不会持久化的,毕竟 Redis 宕机就相当于一个消费者都没有,所有的消息直接被丢弃。
正是因为 PubSub 有这些缺点,它几乎找不到合适的应用场景。所以 Redis 的作者单独开启了一个项目 Disque 专门用来做多播消息队列。该项目目前没有成熟,一直长期处于 Beta 版本,但是相应的客户端 sdk 已经非常丰富了,就待 Redis 作者临门一脚发布一个 Release 版本。关于 Disque 的更多细节,本小册不会多做详细介绍,感兴趣的同学可以去阅读相关文档。
补充
近期 Redis5.0 新增了 Stream 数据结构,这个功能给 Redis 带来了持久化消息队列,从此 PubSub 可以消失了,Disqueue 估计也永远发不出它的 Release 版本了。
本文由 mdnice 多平台发布
相关文章:
redis原理 6:小道消息 —— PubSub
前面我们讲了 Redis 消息队列的使用方法,但是没有提到 Redis 消息队列的不足之处,那就是它不支持消息的多播机制。 img 消息多播 消息多播允许生产者生产一次消息,中间件负责将消息复制到多个消息队列,每个消息队列由相应的消费组…...
Android Studio 的Gradle版本修改
使用Android Studio构建项目时,需要配置Gradle,与Gradle插件。 Gradle是一个构建工具,用于管理和自动化Android项目的构建过程。它使用Groovy或Kotlin作为脚本语言,并提供了强大的配置能力来定义项目的依赖关系、编译选项、打包方…...
Redis的部分面试题
1.Redis是什么?简述它的优缺点? Redis的字符串类型是通过简单动态字符串SDS来实现的。简单动态字符串是Redis自己实现的一种字符串表示方式,相比于C语言中的传统字符串,它具有以下几个特点: 1. 动态调整大小:简单动态字符串可…...
单通道 6GSPS 16位采样DAC子卡模块--【资料下载】
FMC147是一款单通道6.4GSPS(或者配置成2通道3.2GSPS)采样率的12位AD采集、单通道6GSPS(或配置成2通道3GSPS)采样率16位DA输出子卡模块,该板卡为FMC标准,符合VITA57.4规范,该模块可以作为一个理想…...
Python 文件操作详解
概要 Python进行文件操作,在日常编程中是很常用的。为了方便大家,这里对各种文件操作的知识进行汇总。一文在手,无须它求!来一起学习吧。 一、文件的打开和关闭 open()函数 f1 open(rd:\测试文件.txt, moder, encodingutf-8) c…...
【Rust 基础篇】Rust Never类型:表示不会返回的类型
导言 Rust是一种以安全性和高效性著称的系统级编程语言,其设计哲学是在不损失性能的前提下,保障代码的内存安全和线程安全。在Rust中,Never类型是一种特殊的类型,它表示一个函数永远不会返回。Never类型在Rust中有着重要的应用场…...
error “Component name “*****“ should always be multi-word”解决方案
问题 在 vue-cli 创建的项目中,创建文件并命名后,会报 “Component name "*****" should always be multi-word” 报错; Component name "index" should always be multi-word.eslintvue/multi-word-component-names原…...
前后端开发的区别是什么?
VUE的开发方式为什么和后端的MVC开发方式不一样呢? 实际上,Vue 和后端开发的 MVC(Model-View-Controller)方式是不同的,因为它们面对的问题和场景也不同。 前端与后端的职责不同: 前端和后端的职责和任务不…...
小白电脑装机(自用)
几个月前买了配件想自己装电脑,结果最后无法成功点亮,出现的问题是主板上的DebugLED黄灯常亮,即DRAM灯亮。对于微星主板的Debug灯,其含义这篇博文中有说明。 根据另一篇博文,有两种可能。 我这边曾将内存条和主板一块…...
Quic协议 0-RTT
目录 1、Quic协议 2、Quic直接通过TLS握手进行建立链接,TLS是1.3版本 3.1、通过缓存服务器公钥实现0-RTT,服务器 通过kdf密钥派生机制,来产生会话加密key,保证数据向前安全性 3.2、通过PKN来实现数据重传保证数据完整性&…...
在排序数组中查找元素的第一个和最后一个位置——力扣34
文章目录 题目描述法一 二分查找题目描述 法一 二分查找 int bsearch_1(int l, int r) {while (l < r)<...
python列表处理方法
原始文件: id start end a1 10 19 a1 25 34 a2 89 124 a2 149 167 a2 188 221目的文件: a1 1 10 a1 16 25 a2 1 36 a2 61 79 a2 100 133解释说明: 原始文件是gff3文件的一部分,第一列id是基因的名字,第二列和第三列分…...
【Java】快速入门JVM
文章目录 1. JVM简介2. 类加载简介3. 类加载的过程4. 双亲委派5. GC垃圾回收6. JVM的回收方式7. 分代回收 1. JVM简介 JVM(Java虚拟机)是一个名字为Java的进程,是用于执行Java程序的虚拟机。 JVM会从操作系统中申请一大块内存空间,又把这个内存空间划分…...
C#之Winfrom自定义输入框对话框。
如果你需要一个带有输入框的对话框,并在输入完成后接收输入的值,你可以使用自定义窗体来实现。以下是一个示例代码:创建一个继承自 Form 的自定义窗体类,命名为 InputDialogForm,并将窗体上放置一个文本框(…...
docker制作镜像
docker制作镜像 docker制作镜像有两种: 1.docker build dockerfile 2.基于容器制作镜像 基于容器制作镜像 语法:docker commit options 容器名称 参数: -a:作者 -c:修改dockfile创建的镜像 -m:提交…...
广西茶叶元宇宙 武隆以茶为媒 推动茶文旅产业融合发展
8月4日,重庆市武隆区启动为期3天的“武隆首届玩茶荟”。本次活动以“中国最美玩茶地——武隆”为主题,吸引众多国内知名专家、茶企和茶馆相关负责人,共同探索武隆茶文旅融合发展新路径和新业态。 广西茶叶元宇宙:广西茶叶元宇宙 …...
alibaba.excel库使用
目录 依赖 实体类 Controller Service 所用到的接口及工具类 ExcelUtil ExcelListener DefaultExcelListener DefaultExcelResult ExcelResult JsonUtils SpringUtils StreamUtils ValidatorUtils SpringUtils 使用alibab中的excel库来实现excel的导入、导出 依赖 <de…...
机器学习模型选择评估和超参数调优
如何选择模型?如何评估模型?如何调整模型的超参数?模型评估要在测试集上进行,不能在训练集上进行,否则评估的准确率总是100%。所以,一般我们准备好数据集后,要将其分为训练集和测试集࿰…...
深入浅出 Typescript
TypeScript 是 JavaScript 的一个超集,支持 ECMAScript 6 标准(ES6 教程)。 TypeScript 由微软开发的自由和开源的编程语言。 TypeScript 设计目标是开发大型应用,它可以编译成纯 JavaScript,编译出来的 JavaScript …...
Vue3和TypeScript项目-移动端兼容
1 全局安装typescript 2 检测安装成功 3 写的是ts代码,但是最后一定要变成js代码,才能在浏览器使用 这样就会多一个js文件 3 ts语法 数组语法 对象语法 安装vue3项目 成功后进入app。安装依赖。因为我们用的是脚手架,要引入东西的时候不需要…...
如何快速解密科学文库加密文档:终极免费解密工具指南
如何快速解密科学文库加密文档:终极免费解密工具指南 【免费下载链接】ScienceDecrypting 破解CAJViewer带有效期的文档,支持破解科学文库、标准全文数据库下载的文档。无损破解,保留文字和目录,解除有效期限制。 项目地址: htt…...
Node.js环境下的实时口罩检测API开发与部署教程
Node.js环境下的实时口罩检测API开发与部署教程 1. 引言 在当今的智能化场景中,实时口罩检测技术已经成为许多公共场所和企业的必备功能。无论是商场入口、办公大楼还是公共交通场所,快速准确地检测人员是否佩戴口罩都显得尤为重要。 本教程将手把手教…...
Agentic SOC 全阶成长指南:从零到专家,拿下AI安全运营的黄金赛道
2026年RSAC全球网络安全大会落下帷幕,一个行业共识已经不可逆地形成:Agentic SOC,已经从概念验证阶段,正式成为全球企业安全运营的核心标配。 Gartner最新数据显示,2026年全球Agentic SOC相关市场规模突破127亿美元&am…...
Feather生态系统探索:从R包到Python包装器的完整技术栈
Feather生态系统探索:从R包到Python包装器的完整技术栈 【免费下载链接】feather wesm/feather: 是一个用于在 Python 和 R 之间传输数据的轻量级数据格式库。适合对数据科学和数据分析有兴趣的人,特别是需要在 Python 和 R 之间进行数据交换的人。特点是…...
HardSourceWebpackPlugin源码解析:从入口到缓存写入的完整流程
HardSourceWebpackPlugin源码解析:从入口到缓存写入的完整流程 【免费下载链接】hard-source-webpack-plugin 项目地址: https://gitcode.com/gh_mirrors/ha/hard-source-webpack-plugin HardSourceWebpackPlugin是一个为Webpack构建过程提供持久化缓存的插…...
dl-librescore:开源乐谱下载解决方案,打破MuseScore资源获取限制
dl-librescore:开源乐谱下载解决方案,打破MuseScore资源获取限制 【免费下载链接】dl-librescore Download sheet music 项目地址: https://gitcode.com/gh_mirrors/dl/dl-librescore 在音乐创作、教学和学习过程中,获取高质量乐谱资源…...
Tensorflow-Cookbook高级特性解析:Partial Conv、Pixel Shuffle与Spectral Norm
Tensorflow-Cookbook高级特性解析:Partial Conv、Pixel Shuffle与Spectral Norm 【免费下载链接】Tensorflow-Cookbook Simple Tensorflow Cookbook for easy-to-use 项目地址: https://gitcode.com/gh_mirrors/te/Tensorflow-Cookbook Tensorflow-Cookbook是…...
OpenClaw+千问3.5-35B-A3B-FP8:个人知识库自动化更新系统
OpenClaw千问3.5-35B-A3B-FP8:个人知识库自动化更新系统 1. 为什么需要自动化知识库更新 作为一个长期依赖个人知识库的技术写作者,我深刻体会到手动维护知识库的痛点。每当遇到新资料,我需要经历"阅读→摘录→分类→归档"的全流…...
【回眸】系统读书笔记(十)盘点调动资源
目录 前言 资源盘点可以帮你创造选择 三类人生资源 直接价值资源 知识技能类:认知储备和实操能力、学科知识、行业认知、上手操作的技能 记录行为和结果:干成过什么、搞定过什么、负责过什么? 能力逆向推导:把行为翻译成资源…...
飞书机器人集成实战:OpenClaw+Phi-3-vision-128k-instruct打造智能问答助手
飞书机器人集成实战:OpenClawPhi-3-vision-128k-instruct打造智能问答助手 1. 为什么选择这个技术组合? 上周我接到一个产品经理的需求——希望能通过飞书直接发送产品截图,自动获得功能分析报告。传统方案需要开发整套服务端逻辑ÿ…...
