当前位置: 首页 > news >正文

使用 Redis Streams 实现高性能消息队列

1. 引言

在后端开发中,消息队列是一个常见的组件,主要用于解耦系统、提高吞吐量以及实现异步处理。常见的消息队列包括 Kafka、RabbitMQ 以及 ActiveMQ,但 Redis Streams 作为 Redis 5.0 引入的新特性,也提供了一种高效、轻量的消息队列解决方案。

本文将深入探讨 Redis Streams 的核心概念,并演示如何在后端服务中使用 Redis Streams 实现一个高性能的消息队列。


2. Redis Streams 基本概念

Redis Streams 是 Redis 提供的流数据结构,允许存储和消费有序的数据流。它的主要特点包括:

  • 持久化存储:不同于 Pub/Sub 仅支持瞬时消息,Streams 支持持久化存储。

  • 消费分组(Consumer Groups):支持多个消费者消费不同的消息,提高并行能力。

  • 自动消息确认(Acknowledgment):支持消费确认机制,保证消息可靠性。

  • 阻塞读取(Blocking Reads):可以使用 XREADXREADGROUP 进行阻塞式消费,提高实时性。

Redis Streams 的数据结构类似于日志系统,每条消息都带有唯一的 ID 及对应的数据字段,如:

XADD mystream * field1 value1 field2 value2

上面的命令将 field1:value1field2:value2 存入 mystream 流中,* 让 Redis 自动生成 ID。


3. Redis Streams 基本操作

3.1 生产者:添加消息到 Stream

在 Redis 中,使用 XADD 命令向 Stream 发送消息,例如:

XADD my_stream * user_id 12345 action "login"

其中,my_stream 是流名称,* 表示自动生成 ID,user_idaction 代表存储的数据。

3.2 消费者:读取 Stream 中的消息

使用 XRANGE 读取 Stream 消息:

XRANGE my_stream - +

-+ 表示从头到尾读取所有消息。

3.3 组消费模式(Consumer Groups)

创建消费组:

XGROUP CREATE my_stream my_group 0 MKSTREAM

添加消费者并读取数据:

XREADGROUP GROUP my_group consumer1 COUNT 10 STREAMS my_stream >

确认消息已被处理:

XACK my_stream my_group 1681956776310-0

删除已确认的消息(减少存储占用):

XDEL my_stream 1681956776310-0

4. Redis Streams 在后端开发中的应用

4.1 场景 1:用户行为日志存储

应用 Redis Streams 记录用户行为,如登录、点击、浏览等,后台可实时分析用户数据:

  • 生产者:前端或业务逻辑向 user_logs 追加用户行为数据。

  • 消费者:后端消费日志,存入数据库或进行实时分析。

4.2 场景 2:任务队列

Redis Streams 适合作为任务队列,将任务推送到 Stream,多个 Worker 并发消费,提高处理能力。

  • 生产者:任务生成器将任务推送到 task_queue

  • 消费者:多个 Worker 消费任务并处理。

  • 优势:相比传统队列,Redis Streams 可回溯未处理的任务,确保任务不会丢失。


5. Redis Streams vs 传统消息队列

特性Redis StreamsKafkaRabbitMQ
消息持久化
消息确认机制
并行消费
去重功能
性能超高

从表中可以看出,Redis Streams 适用于轻量级消息队列需求,如日志收集、任务队列等,而 Kafka 适用于高吞吐量场景。


6. 示例代码:基于 Python 的 Redis Streams 生产者 & 消费者

安装 Redis-Py 依赖

pip install redis

生产者(Producer)

import redisr = redis.Redis(host='localhost', port=6379, decode_responses=True)# 发送消息
def send_message():r.xadd('task_queue', {'task_id': '123', 'action': 'process_data'})print("Message sent!")send_message()

消费者(Consumer)

import redisdef consume_messages():r = redis.Redis(host='localhost', port=6379, decode_responses=True)while True:messages = r.xread({'task_queue': '$'}, count=1, block=1000)for stream, msgs in messages:for msg_id, data in msgs:print(f"Processing {data}")r.xack('task_queue', 'task_group', msg_id)consume_messages()

7. 总结

Redis Streams 作为 Redis 5.0 引入的新功能,在高性能消息队列场景下表现出色。相比 Kafka 和 RabbitMQ,Redis Streams 适用于中小型业务场景,如日志收集、任务队列等,同时具备持久化存储、消费分组及确认机制。

如果你的项目已经使用 Redis,并且有消息队列需求,Redis Streams 可能是一个非常合适的选择。


8. 参考资料

  • Redis 官方文档 - Streams

  • Redis Streams vs Kafka


希望这篇文章能帮你快速掌握 Redis Streams 并在实际项目中应用!🎯

相关文章:

使用 Redis Streams 实现高性能消息队列

1. 引言 在后端开发中,消息队列是一个常见的组件,主要用于解耦系统、提高吞吐量以及实现异步处理。常见的消息队列包括 Kafka、RabbitMQ 以及 ActiveMQ,但 Redis Streams 作为 Redis 5.0 引入的新特性,也提供了一种高效、轻量的消…...

深度学习|表示学习|卷积神经网络|DeconvNet是什么?|18

如是我闻: DeconvNet(反卷积网络)是一种可视化 CNN(卷积神经网络)内部特征的方法,用于理解 CNN 是如何提取图像特征的。这个方法由 Zeiler & Fergus(2013) 提出,目的…...

(优先级队列(堆)) 【本节目标】 1. 掌握堆的概念及实现 2. 掌握 PriorityQueue 的使用

优先级队列(堆) 1. 优先级队列1.1 概念 2. 优先级队列的模拟实现2.1 堆的概念2.2 堆的存储方式2.3 堆的创建2.3.1 堆向下调整2.3.2 堆的创建2.3.3 建堆的时间复杂度 【本节目标】 掌握堆的概念及实现掌握 PriorityQueue 的使用 1. 优先级队列 1.1 概念…...

优化数据库结构

MySQL学习大纲 一个好的数据库设计方案对于数据库的性能尝尝会起到事倍功半的效果,合理的数据库结构不仅使数据库占用更小的磁盘空间,而且使查询速度更快。数据库结构的设计需要考虑数据冗余、查询和更新速度、字段的数据类型是否合理等多方面的内容&…...

密云生活的初体验

【】在《岁末随笔之碎碎念》里,我通告了自己搬新家的事情。乙巳年开始,我慢慢与大家分享自己买房装修以及在新家的居住体验等情况。 跳过买房装修的内容,今天先说说这三个月的生活体验。 【白河】 潮白河是海河水系五大河之一,贯穿…...

图像分类与目标检测算法

在计算机视觉领域,图像分类与目标检测是两项至关重要的技术。它们通过对图像进行深入解析和理解,为各种应用场景提供了强大的支持。本文将详细介绍这两项技术的算法原理、技术进展以及当前的落地应用。 一、图像分类算法 图像分类是指将输入的图像划分为…...

计算机网络——流量控制

流量控制的基本方法是确保发送方不会以超过接收方处理能力的速度发送数据包。 通常的做法是接收方会向发送方提供某种反馈,如: (1)停止&等待 在任何时候只有一个数据包在传输,发送方发送一个数据包,…...

体验 DeepSeek 多模态大模型 Janus-Pro-7B

含有图片的链接: https://mp.weixin.qq.com/s/i6kuVcGU1CUMYRPDM-bKog?token2020918682&langzh_CN 继上篇文章下载了 Janus-Pro-7B 后,准备本地运行时发现由于电脑配置配置太低(显存小于24G),无法运行&#xff0…...

使用mockttp库模拟HTTP服务器和客户端进行单元测试

简介 mockttp 是一个用于在 Node.js 中模拟 HTTP 服务器和客户端的库。它可以帮助我们进行单元测试和集成测试,而不需要实际发送 HTTP 请求。 安装 npm install mockttp types/mockttp模拟http服务测试 首先导入并创建一个本地服务器实例 import { getLocal } …...

解决每次打开终端都需要source ~/.bashrc的问题(记录)

新服务器或者电脑通常需要设置一些环境变量,例如新电脑安装了Anaconda等软件,在配置环境变量后发现每次都需要重新source,非常麻烦,执行下面添加脚本实现一劳永逸 vim .bash_profile# .bash_profileif [ -f ~/.bashrc ]; then. ~…...

UE5 蓝图学习计划 - Day 14:搭建基础游戏场景

在上一节中,我们 确定了游戏类型,并完成了 项目搭建、角色蓝图的基础设置(移动)。今天,我们将进一步完善 游戏场景,搭建 地形、墙壁、机关、触发器 等基础元素,并添加角色跳跃功能,为…...

C++常用拷贝和替换算法

算法简介: copy // 容器内指定的元素拷贝到另一容器replace // 将容器内指定范围的旧元素改为新元素replace_if // 容器内指定范围满足条件的元素替换为新元素swap //互换两个容器的元素 1. copy 功能描述: 将容器内指定范围的数据拷贝到另一容器中函…...

取消和确认按钮没有显示的问题

取消和确认按钮没有显示的问题<template #footer> <template #footer> <!-- 使用插槽名称 #footer --> <span class"dialog-footer"> <el-button click"dialogVisible false">取消</el-button> …...

Python安居客二手小区数据爬取(2025年)

目录 2025年安居客二手小区数据爬取观察目标网页观察详情页数据准备工作&#xff1a;安装装备就像打游戏代码详解&#xff1a;每行代码都是你的小兵完整代码大放送爬取结果 2025年安居客二手小区数据爬取 这段时间需要爬取安居客二手小区数据&#xff0c;看了一下相关教程基本…...

Java/Kotlin HashMap 等集合引发 ConcurrentModificationException

在对一些非并发集合同时进行读写的时候&#xff0c;会抛出 ConcurrentModificationException 异常产生示例 示例一&#xff08;单线程&#xff09;&#xff1a; 遍历集合时候去修改 抛出 ConcurrentModificationException 的主要原因是当你在遍历一个集合&#xff08;如 Map…...

【Day31 LeetCode】动态规划DP Ⅳ

一、动态规划DP Ⅳ 1、最后一块石头的重量II 1049 这题有点像脑筋急转弯&#xff0c;尽量让石头分成重量相同的两堆&#xff08;尽可能相同&#xff09;&#xff0c;相撞之后剩下的石头就是最小的。明白这一点&#xff0c;就与上一篇博客里的划分等和数组很相似。划分等和数组…...

Unity 2D实战小游戏开发跳跳鸟 - 记录显示最高分

上一篇文章中我们实现了游戏的开始界面,在开始界面中有一个最高分数的UI,本文将接着实现记录最高分数以及在开始界面中显示最高分数的功能。 添加跳跳鸟死亡事件 要记录最高分,则需要在跳跳鸟死亡时去进行判断当前的分数是否是最高分,如果是最高分则进行记录,如果低于之前…...

Ollama AI 开发助手完全指南:从入门到实践

本文将详细介绍如何使用 Ollama AI 开发助手来提升开发效率,包括环境搭建、模型选择、最佳实践等全方位内容。 © ivwdcwso (ID: u012172506) 目录 基础环境配置模型选择与使用开发工具集成实践应用场景性能优化与注意事项最佳实践总结一、基础环境配置 1.1 系统要求 在…...

Racecar Gym

Racecar Gym 参考&#xff1a;https://github.com/axelbr/racecar_gym/blob/master/README.md 1. 项目介绍 Racecar Gym 是一个基于 PyBullet 物理引擎的 reinforcement learning (RL) 训练环境&#xff0c;模拟微型 F1Tenth 竞速赛车。它兼容 Gym API 和 PettingZoo API&am…...

代码随想录36 动态规划

leetcode 343.整数拆分 给定一个正整数 n &#xff0c;将其拆分为 k 个 正整数 的和&#xff08; k > 2 &#xff09;&#xff0c;并使这些整数的乘积最大化。 返回 你可以获得的最大乘积 。 示例 1: 输入: n 2 输出: 1 解释: 2 1 1, 1 1 1。 示例 2: 输入: n 1…...

[10-3]软件I2C读写MPU6050 江协科技学习笔记(16个知识点)

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16...

Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)

引言&#xff1a;为什么 Eureka 依然是存量系统的核心&#xff1f; 尽管 Nacos 等新注册中心崛起&#xff0c;但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制&#xff0c;是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...

Spring Boot面试题精选汇总

&#x1f91f;致敬读者 &#x1f7e9;感谢阅读&#x1f7e6;笑口常开&#x1f7ea;生日快乐⬛早点睡觉 &#x1f4d8;博主相关 &#x1f7e7;博主信息&#x1f7e8;博客首页&#x1f7eb;专栏推荐&#x1f7e5;活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...

AI编程--插件对比分析:CodeRider、GitHub Copilot及其他

AI编程插件对比分析&#xff1a;CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展&#xff0c;AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者&#xff0c;分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

html-<abbr> 缩写或首字母缩略词

定义与作用 <abbr> 标签用于表示缩写或首字母缩略词&#xff0c;它可以帮助用户更好地理解缩写的含义&#xff0c;尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时&#xff0c;会显示一个提示框。 示例&#x…...

Vue ③-生命周期 || 脚手架

生命周期 思考&#xff1a;什么时候可以发送初始化渲染请求&#xff1f;&#xff08;越早越好&#xff09; 什么时候可以开始操作dom&#xff1f;&#xff08;至少dom得渲染出来&#xff09; Vue生命周期&#xff1a; 一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个…...

k8s从入门到放弃之HPA控制器

k8s从入门到放弃之HPA控制器 Kubernetes中的Horizontal Pod Autoscaler (HPA)控制器是一种用于自动扩展部署、副本集或复制控制器中Pod数量的机制。它可以根据观察到的CPU利用率&#xff08;或其他自定义指标&#xff09;来调整这些对象的规模&#xff0c;从而帮助应用程序在负…...

自然语言处理——文本分类

文本分类 传统机器学习方法文本表示向量空间模型 特征选择文档频率互信息信息增益&#xff08;IG&#xff09; 分类器设计贝叶斯理论&#xff1a;线性判别函数 文本分类性能评估P-R曲线ROC曲线 将文本文档或句子分类为预定义的类或类别&#xff0c; 有单标签多类别文本分类和多…...

前端高频面试题2:浏览器/计算机网络

本专栏相关链接 前端高频面试题1&#xff1a;HTML/CSS 前端高频面试题2&#xff1a;浏览器/计算机网络 前端高频面试题3&#xff1a;JavaScript 1.什么是强缓存、协商缓存&#xff1f; 强缓存&#xff1a; 当浏览器请求资源时&#xff0c;首先检查本地缓存是否命中。如果命…...

热门Chrome扩展程序存在明文传输风险,用户隐私安全受威胁

赛门铁克威胁猎手团队最新报告披露&#xff0c;数款拥有数百万活跃用户的Chrome扩展程序正在通过未加密的HTTP连接静默泄露用户敏感数据&#xff0c;严重威胁用户隐私安全。 知名扩展程序存在明文传输风险 尽管宣称提供安全浏览、数据分析或便捷界面等功能&#xff0c;但SEMR…...