使用 Redis Streams 实现高性能消息队列
1. 引言
在后端开发中,消息队列是一个常见的组件,主要用于解耦系统、提高吞吐量以及实现异步处理。常见的消息队列包括 Kafka、RabbitMQ 以及 ActiveMQ,但 Redis Streams 作为 Redis 5.0 引入的新特性,也提供了一种高效、轻量的消息队列解决方案。
本文将深入探讨 Redis Streams 的核心概念,并演示如何在后端服务中使用 Redis Streams 实现一个高性能的消息队列。
2. Redis Streams 基本概念
Redis Streams 是 Redis 提供的流数据结构,允许存储和消费有序的数据流。它的主要特点包括:
-
持久化存储:不同于 Pub/Sub 仅支持瞬时消息,Streams 支持持久化存储。
-
消费分组(Consumer Groups):支持多个消费者消费不同的消息,提高并行能力。
-
自动消息确认(Acknowledgment):支持消费确认机制,保证消息可靠性。
-
阻塞读取(Blocking Reads):可以使用
XREAD或XREADGROUP进行阻塞式消费,提高实时性。
Redis Streams 的数据结构类似于日志系统,每条消息都带有唯一的 ID 及对应的数据字段,如:
XADD mystream * field1 value1 field2 value2
上面的命令将 field1:value1 和 field2:value2 存入 mystream 流中,* 让 Redis 自动生成 ID。
3. Redis Streams 基本操作
3.1 生产者:添加消息到 Stream
在 Redis 中,使用 XADD 命令向 Stream 发送消息,例如:
XADD my_stream * user_id 12345 action "login"
其中,my_stream 是流名称,* 表示自动生成 ID,user_id 和 action 代表存储的数据。
3.2 消费者:读取 Stream 中的消息
使用 XRANGE 读取 Stream 消息:
XRANGE my_stream - +
- 和 + 表示从头到尾读取所有消息。
3.3 组消费模式(Consumer Groups)
创建消费组:
XGROUP CREATE my_stream my_group 0 MKSTREAM
添加消费者并读取数据:
XREADGROUP GROUP my_group consumer1 COUNT 10 STREAMS my_stream >
确认消息已被处理:
XACK my_stream my_group 1681956776310-0
删除已确认的消息(减少存储占用):
XDEL my_stream 1681956776310-0
4. Redis Streams 在后端开发中的应用
4.1 场景 1:用户行为日志存储
应用 Redis Streams 记录用户行为,如登录、点击、浏览等,后台可实时分析用户数据:
-
生产者:前端或业务逻辑向
user_logs追加用户行为数据。 -
消费者:后端消费日志,存入数据库或进行实时分析。
4.2 场景 2:任务队列
Redis Streams 适合作为任务队列,将任务推送到 Stream,多个 Worker 并发消费,提高处理能力。
-
生产者:任务生成器将任务推送到
task_queue。 -
消费者:多个 Worker 消费任务并处理。
-
优势:相比传统队列,Redis Streams 可回溯未处理的任务,确保任务不会丢失。
5. Redis Streams vs 传统消息队列
| 特性 | Redis Streams | Kafka | RabbitMQ |
|---|---|---|---|
| 消息持久化 | 是 | 是 | 是 |
| 消息确认机制 | 是 | 是 | 是 |
| 并行消费 | 是 | 是 | 是 |
| 去重功能 | 否 | 是 | 否 |
| 性能 | 高 | 超高 | 中 |
从表中可以看出,Redis Streams 适用于轻量级消息队列需求,如日志收集、任务队列等,而 Kafka 适用于高吞吐量场景。
6. 示例代码:基于 Python 的 Redis Streams 生产者 & 消费者
安装 Redis-Py 依赖:
pip install redis
生产者(Producer):
import redisr = redis.Redis(host='localhost', port=6379, decode_responses=True)# 发送消息
def send_message():r.xadd('task_queue', {'task_id': '123', 'action': 'process_data'})print("Message sent!")send_message()
消费者(Consumer):
import redisdef consume_messages():r = redis.Redis(host='localhost', port=6379, decode_responses=True)while True:messages = r.xread({'task_queue': '$'}, count=1, block=1000)for stream, msgs in messages:for msg_id, data in msgs:print(f"Processing {data}")r.xack('task_queue', 'task_group', msg_id)consume_messages()
7. 总结
Redis Streams 作为 Redis 5.0 引入的新功能,在高性能消息队列场景下表现出色。相比 Kafka 和 RabbitMQ,Redis Streams 适用于中小型业务场景,如日志收集、任务队列等,同时具备持久化存储、消费分组及确认机制。
如果你的项目已经使用 Redis,并且有消息队列需求,Redis Streams 可能是一个非常合适的选择。
8. 参考资料
-
Redis 官方文档 - Streams
-
Redis Streams vs Kafka
希望这篇文章能帮你快速掌握 Redis Streams 并在实际项目中应用!🎯
相关文章:
使用 Redis Streams 实现高性能消息队列
1. 引言 在后端开发中,消息队列是一个常见的组件,主要用于解耦系统、提高吞吐量以及实现异步处理。常见的消息队列包括 Kafka、RabbitMQ 以及 ActiveMQ,但 Redis Streams 作为 Redis 5.0 引入的新特性,也提供了一种高效、轻量的消…...
30.Word:设计并制作新年贺卡以及标签【30】
目录 NO1.2 NO3邮件合并-信函 NO4邮件合并-标签 NO1.2 另存为/F12:考生文件夹:Word.docx布局→页面设置对话框→页边距:上下左右→纸张:宽度/高度(先调页边距🆗)设计→页面颜色→填充效果→…...
Nginx开发01:基础配置
一、下载和启动 1.下载、使用命令行启动:Web开发:web服务器-Nginx的基础介绍(含AI文稿)_nginx作为web服务器,可以承担哪些基本任务-CSDN博客 注意:我配置的端口是81 2.测试连接是否正常 访问Welcome to nginx! 如果…...
数据分析系列--⑨RapidMiner训练集、测试集、验证集划分
一、数据集获取 二、划分数据集 1.导入和加载数据 2.数据集划分 2.1 划分说明 2.2 方法一 2.3 方法二 一、数据集获取 点击下载数据集 此数据集包含538312条数据. 二、划分数据集 1.导入和加载数据 2.数据集划分 2.1 划分说明 2.2 方法一 使用Filter Example Range算子. …...
C基础寒假练习(6)
一、终端输入行数,打印倒金字塔 #include <stdio.h> int main() {int rows;printf("请输入倒金字塔的行数: ");scanf("%d", &rows);for (int i rows; i > 0; i--) {// 打印空格for (int j 0; j < rows - i; j) {printf(&qu…...
mysqldump+-binlog增量备份
注意:二进制文件删除必须使用help purge 不可用rm -f 会崩 一、概念 增量备份:仅备份上次备份以后变化的数据 差异备份:仅备份上次完全备份以后变化的数据 完全备份:顾名思义,将数据完全备份 其中,…...
《DeepSeek R1:大模型最简安装秘籍》
DeepSeek R1:AI 大模型界的新起之秀 在人工智能的璀璨星空中,大模型如繁星般闪耀,而 DeepSeek R1 无疑是其中一颗冉冉升起的新星,自问世以来便吸引了全球的目光,在人工智能领域占据了重要的一席之地。 从性能表现上看…...
FLTK - FLTK1.4.1 - demo - bitmap
文章目录 FLTK - FLTK1.4.1 - demo - bitmap概述笔记END FLTK - FLTK1.4.1 - demo - bitmap 概述 // 功能 : 演示位图数据在按钮上的显示 // * 以按钮为范围或者以窗口为范围移动 // * 上下左右, 文字和图像的相对位置 // 失能按钮,使能按钮 // 知识点 // FLTK可…...
数据库优化:提升性能的关键策略
1. 引言 在后端开发中,数据库的性能直接影响系统的稳定性和响应速度。随着业务增长,数据库查询变慢、负载过高等问题可能会影响用户体验。 本文将介绍数据库优化的关键策略,包括索引优化、查询优化、分库分表、缓存机制等,并结合…...
【Leetcode 每日一题】119. 杨辉三角 II
问题背景 给定一个非负索引 r o w I n d e x rowIndex rowIndex,返回「杨辉三角」的第 r o w I n d e x rowIndex rowIndex 行。 在「杨辉三角」中,每个数是它左上方和右上方的数的和。 数据约束 0 ≤ r o w I n d e x ≤ 33 0 \le rowIndex \le 33 …...
Java小白入门教程:HashSet
目录 一、定义 二、作用 1、存储唯一元素 2、快速查找 3、去除重复 三、使用场景 1、当你需要存储一系列唯一的元素,并且不关心元素的顺序时。 2、当你需要快速判断一个元素是否存在于集合中时。 四、语法及示例 1、创建HashSet 2、添加元素 3、检查元素…...
玩转大语言模型——使用langchain和Ollama本地部署大语言模型
系列文章目录 玩转大语言模型——使用langchain和Ollama本地部署大语言模型 玩转大语言模型——ollama导入huggingface下载的模型 玩转大语言模型——langchain调用ollama视觉多模态语言模型 玩转大语言模型——使用GraphRAGOllama构建知识图谱 玩转大语言模型——完美解决Gra…...
抖♬♬__ac_signature 算法逆向分析
和网页端一样,算法没有问题...
网络编程套接字(中)
文章目录 🍏简单的TCP网络程序服务端创建套接字服务端绑定服务端监听服务端获取连接服务端处理请求客户端创建套接字客户端连接服务器客户端发起请求服务器测试单执行流服务器的弊端 🍐多进程版的TCP网络程序捕捉SIGCHLD信号让孙子进程提供服务 …...
CodeForces 611:New Year and Domino ← 二维前缀和
【题目来源】 https://codeforces.com/contest/611/problem/C 【题目描述】 They say "years are like dominoes, tumbling one after the other". But would a year fit into a grid? I dont think so. Limak is a little polar bear who loves to play. He has r…...
十分钟快速上手 markdown
前言 本人利用寒假期间,将自己所学的markdown的知识,以及将自己常用的一些操作和注意事项记录下来,希望能够帮助大家 一、markdown是什么 Markdown 是一种轻量级标记语言,说白了就是可以让你利用最简单的语法达到最好的排版效果…...
Go语言中的Select
Select 在 Go 语言中,select 是一种用于处理多个通道操作的控制结构。它允许你同时监听多个通道上的通信操作(发送或接收),并根据哪个操作先完成来执行相应的代码块。select 是 Go 并发编程中的一个重要工具,常用于实…...
Vue.js组件开发-实现全屏图片文字缩放切换特效
使用 Vue 实现全屏图片文字缩放切换特效 步骤 创建 Vue 项目:使用 Vue CLI 来快速创建一个新的 Vue 项目。设计组件结构:创建一个包含图片和文字的组件,并实现缩放和切换效果。实现样式:使用 CSS 来实现全屏显示、缩放和切换动画…...
360嵌入式开发面试题及参考答案
解释一下 802.11ax 和 802.11ac/n 有什么区别 速度与带宽 802.11n 支持的最高理论速率为 600Mbps,802.11ac 进一步提升,单流最高可达 866.7Mbps,多流情况下能达到更高,如 1.3Gbps 等。而 802.11ax(Wi-Fi 6)引入了更多先进技术,理论最高速率可达 9.6Gbps,相比前两者有大…...
python recv的概念和使用案例
recv 是网络编程中用于从套接字接收数据的核心函数,常见于 TCP/UDP 通信。以下是其概念、用法和案例详解: 概念 作用:从已连接(TCP)或已绑定(UDP)的套接字接收数据。参数: bufsize:…...
白话DeepSeek-R1论文(三)| DeepSeek-R1蒸馏技术:让小模型“继承”大模型的推理超能力
最近有不少朋友来询问Deepseek的核心技术,陆续针对DeepSeek-R1论文中的核心内容进行解读,并且用大家都能听懂的方式来解读。这是第三篇趣味解读。 DeepSeek-R1蒸馏技术:让小模型“继承”大模型的推理超能力 当大模型成为“老师”,…...
Web3.js详解
Web1&Web2&Web3 以下是Web1、Web2和Web3的详细介绍,以及一个对比表格: Web1 定义:Web1指的是有着固定内容的非许可的开源网络。特点:在Web1时代,网站内容主要由网站管理员或创建者提供,用户只能…...
jvm - GC篇
如何减慢一个对象进入老年代的速度,如何降低GC的次数 堆内存细分 年轻代(Young Generation): 新创建的对象首先被分配在年轻代中。年轻代又被进一步划分为一个Eden区和两个Survivor区(通常称为S0和S1)。…...
vue2项目(一)
项目介绍 电商前台项目 技术架构:vuewebpackvuexvue-routeraxiosless.. 封装通用组件登录注册token购物车支付项目性能优化 一、项目初始化 使用vue create projrct_vue2在命令行窗口创建项目 1.1、脚手架目录介绍 ├── node_modules:放置项目的依赖 ├──…...
【Leetcode 热题 100】64. 最小路径和
问题背景 给定一个包含非负整数的 m n m \times n mn 网格 g r i d grid grid,请找出一条从左上角到右下角的路径,使得路径上的数字总和为最小。 说明:每次只能向下或者向右移动一步。 数据约束 m g r i d . l e n g t h m grid.lengt…...
[LeetCode]day9 203.移除链表元素
203. 移除链表元素 - 力扣(LeetCode) 题目描述 给你一个链表的头节点 head 和一个整数 val ,请你删除链表中所有满足 Node.val val 的节点,并返回 新的头节点 。 示例 1: 输入:head [1,2,6,3,4,5,6], v…...
Recommender Systems with Large Models
一、引言 信息爆炸时代,用户面临信息过载,传统推荐系统依赖经典算法,难以满足需求。大模型基于深度学习,经大规模预训练,具备强大能力,能实现更精准推荐,为推荐系统发展开辟新路径。 二、大模…...
TOF技术原理和静噪对策
本文章是笔者整理的备忘笔记。希望在帮助自己温习避免遗忘的同时,也能帮助其他需要参考的朋友。如有谬误,欢迎大家进行指正。 一、什么是TOF TOF 是Time of Flight的缩写,它是一种通过利用照射波和反射波之间的时间差来测量到物体的距离的测…...
MongoDB常见的运维工具总结介绍
MongoDB 提供了一些强大的运维工具,帮助管理员进行数据库监控、备份、恢复、性能优化等操作。以下是一些常见的 MongoDB 运维工具及其功能介绍: 1. MongoDB Atlas 功能:MongoDB Atlas 是 MongoDB 官方的云托管数据库服务,它提供…...
B-树:解锁大数据存储和与快速存储的密码
在我们学习数据结构的过程中,我们会学习到二叉搜索树、二叉平衡树、红黑树。 这些无一例外,是以一个二叉树展开的,那么对于我们寻找其中存在树中的数据,这个也是一个不错的方法。 但是,如若是遇到了非常大的数据容量…...
