当前位置: 首页 > news >正文

使用 Redis Streams 实现高性能消息队列

1. 引言

在后端开发中,消息队列是一个常见的组件,主要用于解耦系统、提高吞吐量以及实现异步处理。常见的消息队列包括 Kafka、RabbitMQ 以及 ActiveMQ,但 Redis Streams 作为 Redis 5.0 引入的新特性,也提供了一种高效、轻量的消息队列解决方案。

本文将深入探讨 Redis Streams 的核心概念,并演示如何在后端服务中使用 Redis Streams 实现一个高性能的消息队列。


2. Redis Streams 基本概念

Redis Streams 是 Redis 提供的流数据结构,允许存储和消费有序的数据流。它的主要特点包括:

  • 持久化存储:不同于 Pub/Sub 仅支持瞬时消息,Streams 支持持久化存储。

  • 消费分组(Consumer Groups):支持多个消费者消费不同的消息,提高并行能力。

  • 自动消息确认(Acknowledgment):支持消费确认机制,保证消息可靠性。

  • 阻塞读取(Blocking Reads):可以使用 XREADXREADGROUP 进行阻塞式消费,提高实时性。

Redis Streams 的数据结构类似于日志系统,每条消息都带有唯一的 ID 及对应的数据字段,如:

XADD mystream * field1 value1 field2 value2

上面的命令将 field1:value1field2:value2 存入 mystream 流中,* 让 Redis 自动生成 ID。


3. Redis Streams 基本操作

3.1 生产者:添加消息到 Stream

在 Redis 中,使用 XADD 命令向 Stream 发送消息,例如:

XADD my_stream * user_id 12345 action "login"

其中,my_stream 是流名称,* 表示自动生成 ID,user_idaction 代表存储的数据。

3.2 消费者:读取 Stream 中的消息

使用 XRANGE 读取 Stream 消息:

XRANGE my_stream - +

-+ 表示从头到尾读取所有消息。

3.3 组消费模式(Consumer Groups)

创建消费组:

XGROUP CREATE my_stream my_group 0 MKSTREAM

添加消费者并读取数据:

XREADGROUP GROUP my_group consumer1 COUNT 10 STREAMS my_stream >

确认消息已被处理:

XACK my_stream my_group 1681956776310-0

删除已确认的消息(减少存储占用):

XDEL my_stream 1681956776310-0

4. Redis Streams 在后端开发中的应用

4.1 场景 1:用户行为日志存储

应用 Redis Streams 记录用户行为,如登录、点击、浏览等,后台可实时分析用户数据:

  • 生产者:前端或业务逻辑向 user_logs 追加用户行为数据。

  • 消费者:后端消费日志,存入数据库或进行实时分析。

4.2 场景 2:任务队列

Redis Streams 适合作为任务队列,将任务推送到 Stream,多个 Worker 并发消费,提高处理能力。

  • 生产者:任务生成器将任务推送到 task_queue

  • 消费者:多个 Worker 消费任务并处理。

  • 优势:相比传统队列,Redis Streams 可回溯未处理的任务,确保任务不会丢失。


5. Redis Streams vs 传统消息队列

特性Redis StreamsKafkaRabbitMQ
消息持久化
消息确认机制
并行消费
去重功能
性能超高

从表中可以看出,Redis Streams 适用于轻量级消息队列需求,如日志收集、任务队列等,而 Kafka 适用于高吞吐量场景。


6. 示例代码:基于 Python 的 Redis Streams 生产者 & 消费者

安装 Redis-Py 依赖

pip install redis

生产者(Producer)

import redisr = redis.Redis(host='localhost', port=6379, decode_responses=True)# 发送消息
def send_message():r.xadd('task_queue', {'task_id': '123', 'action': 'process_data'})print("Message sent!")send_message()

消费者(Consumer)

import redisdef consume_messages():r = redis.Redis(host='localhost', port=6379, decode_responses=True)while True:messages = r.xread({'task_queue': '$'}, count=1, block=1000)for stream, msgs in messages:for msg_id, data in msgs:print(f"Processing {data}")r.xack('task_queue', 'task_group', msg_id)consume_messages()

7. 总结

Redis Streams 作为 Redis 5.0 引入的新功能,在高性能消息队列场景下表现出色。相比 Kafka 和 RabbitMQ,Redis Streams 适用于中小型业务场景,如日志收集、任务队列等,同时具备持久化存储、消费分组及确认机制。

如果你的项目已经使用 Redis,并且有消息队列需求,Redis Streams 可能是一个非常合适的选择。


8. 参考资料

  • Redis 官方文档 - Streams

  • Redis Streams vs Kafka


希望这篇文章能帮你快速掌握 Redis Streams 并在实际项目中应用!🎯

相关文章:

使用 Redis Streams 实现高性能消息队列

1. 引言 在后端开发中,消息队列是一个常见的组件,主要用于解耦系统、提高吞吐量以及实现异步处理。常见的消息队列包括 Kafka、RabbitMQ 以及 ActiveMQ,但 Redis Streams 作为 Redis 5.0 引入的新特性,也提供了一种高效、轻量的消…...

深度学习|表示学习|卷积神经网络|DeconvNet是什么?|18

如是我闻: DeconvNet(反卷积网络)是一种可视化 CNN(卷积神经网络)内部特征的方法,用于理解 CNN 是如何提取图像特征的。这个方法由 Zeiler & Fergus(2013) 提出,目的…...

(优先级队列(堆)) 【本节目标】 1. 掌握堆的概念及实现 2. 掌握 PriorityQueue 的使用

优先级队列(堆) 1. 优先级队列1.1 概念 2. 优先级队列的模拟实现2.1 堆的概念2.2 堆的存储方式2.3 堆的创建2.3.1 堆向下调整2.3.2 堆的创建2.3.3 建堆的时间复杂度 【本节目标】 掌握堆的概念及实现掌握 PriorityQueue 的使用 1. 优先级队列 1.1 概念…...

优化数据库结构

MySQL学习大纲 一个好的数据库设计方案对于数据库的性能尝尝会起到事倍功半的效果,合理的数据库结构不仅使数据库占用更小的磁盘空间,而且使查询速度更快。数据库结构的设计需要考虑数据冗余、查询和更新速度、字段的数据类型是否合理等多方面的内容&…...

密云生活的初体验

【】在《岁末随笔之碎碎念》里,我通告了自己搬新家的事情。乙巳年开始,我慢慢与大家分享自己买房装修以及在新家的居住体验等情况。 跳过买房装修的内容,今天先说说这三个月的生活体验。 【白河】 潮白河是海河水系五大河之一,贯穿…...

图像分类与目标检测算法

在计算机视觉领域,图像分类与目标检测是两项至关重要的技术。它们通过对图像进行深入解析和理解,为各种应用场景提供了强大的支持。本文将详细介绍这两项技术的算法原理、技术进展以及当前的落地应用。 一、图像分类算法 图像分类是指将输入的图像划分为…...

计算机网络——流量控制

流量控制的基本方法是确保发送方不会以超过接收方处理能力的速度发送数据包。 通常的做法是接收方会向发送方提供某种反馈,如: (1)停止&等待 在任何时候只有一个数据包在传输,发送方发送一个数据包,…...

体验 DeepSeek 多模态大模型 Janus-Pro-7B

含有图片的链接: https://mp.weixin.qq.com/s/i6kuVcGU1CUMYRPDM-bKog?token2020918682&langzh_CN 继上篇文章下载了 Janus-Pro-7B 后,准备本地运行时发现由于电脑配置配置太低(显存小于24G),无法运行&#xff0…...

使用mockttp库模拟HTTP服务器和客户端进行单元测试

简介 mockttp 是一个用于在 Node.js 中模拟 HTTP 服务器和客户端的库。它可以帮助我们进行单元测试和集成测试,而不需要实际发送 HTTP 请求。 安装 npm install mockttp types/mockttp模拟http服务测试 首先导入并创建一个本地服务器实例 import { getLocal } …...

解决每次打开终端都需要source ~/.bashrc的问题(记录)

新服务器或者电脑通常需要设置一些环境变量,例如新电脑安装了Anaconda等软件,在配置环境变量后发现每次都需要重新source,非常麻烦,执行下面添加脚本实现一劳永逸 vim .bash_profile# .bash_profileif [ -f ~/.bashrc ]; then. ~…...

UE5 蓝图学习计划 - Day 14:搭建基础游戏场景

在上一节中,我们 确定了游戏类型,并完成了 项目搭建、角色蓝图的基础设置(移动)。今天,我们将进一步完善 游戏场景,搭建 地形、墙壁、机关、触发器 等基础元素,并添加角色跳跃功能,为…...

C++常用拷贝和替换算法

算法简介: copy // 容器内指定的元素拷贝到另一容器replace // 将容器内指定范围的旧元素改为新元素replace_if // 容器内指定范围满足条件的元素替换为新元素swap //互换两个容器的元素 1. copy 功能描述: 将容器内指定范围的数据拷贝到另一容器中函…...

取消和确认按钮没有显示的问题

取消和确认按钮没有显示的问题<template #footer> <template #footer> <!-- 使用插槽名称 #footer --> <span class"dialog-footer"> <el-button click"dialogVisible false">取消</el-button> …...

Python安居客二手小区数据爬取(2025年)

目录 2025年安居客二手小区数据爬取观察目标网页观察详情页数据准备工作&#xff1a;安装装备就像打游戏代码详解&#xff1a;每行代码都是你的小兵完整代码大放送爬取结果 2025年安居客二手小区数据爬取 这段时间需要爬取安居客二手小区数据&#xff0c;看了一下相关教程基本…...

Java/Kotlin HashMap 等集合引发 ConcurrentModificationException

在对一些非并发集合同时进行读写的时候&#xff0c;会抛出 ConcurrentModificationException 异常产生示例 示例一&#xff08;单线程&#xff09;&#xff1a; 遍历集合时候去修改 抛出 ConcurrentModificationException 的主要原因是当你在遍历一个集合&#xff08;如 Map…...

【Day31 LeetCode】动态规划DP Ⅳ

一、动态规划DP Ⅳ 1、最后一块石头的重量II 1049 这题有点像脑筋急转弯&#xff0c;尽量让石头分成重量相同的两堆&#xff08;尽可能相同&#xff09;&#xff0c;相撞之后剩下的石头就是最小的。明白这一点&#xff0c;就与上一篇博客里的划分等和数组很相似。划分等和数组…...

Unity 2D实战小游戏开发跳跳鸟 - 记录显示最高分

上一篇文章中我们实现了游戏的开始界面,在开始界面中有一个最高分数的UI,本文将接着实现记录最高分数以及在开始界面中显示最高分数的功能。 添加跳跳鸟死亡事件 要记录最高分,则需要在跳跳鸟死亡时去进行判断当前的分数是否是最高分,如果是最高分则进行记录,如果低于之前…...

Ollama AI 开发助手完全指南:从入门到实践

本文将详细介绍如何使用 Ollama AI 开发助手来提升开发效率,包括环境搭建、模型选择、最佳实践等全方位内容。 © ivwdcwso (ID: u012172506) 目录 基础环境配置模型选择与使用开发工具集成实践应用场景性能优化与注意事项最佳实践总结一、基础环境配置 1.1 系统要求 在…...

Racecar Gym

Racecar Gym 参考&#xff1a;https://github.com/axelbr/racecar_gym/blob/master/README.md 1. 项目介绍 Racecar Gym 是一个基于 PyBullet 物理引擎的 reinforcement learning (RL) 训练环境&#xff0c;模拟微型 F1Tenth 竞速赛车。它兼容 Gym API 和 PettingZoo API&am…...

代码随想录36 动态规划

leetcode 343.整数拆分 给定一个正整数 n &#xff0c;将其拆分为 k 个 正整数 的和&#xff08; k > 2 &#xff09;&#xff0c;并使这些整数的乘积最大化。 返回 你可以获得的最大乘积 。 示例 1: 输入: n 2 输出: 1 解释: 2 1 1, 1 1 1。 示例 2: 输入: n 1…...

工业设计必看:SolidWorks曲面建模中的NURBS核心原理与7个避坑指南(2024版)

工业设计进阶&#xff1a;SolidWorks曲面建模中的NURBS核心原理与高阶实践&#xff08;2024版&#xff09; 在汽车外壳的流线型曲面或消费电子产品的有机形态背后&#xff0c;NURBS&#xff08;非均匀有理B样条&#xff09;技术始终是工业设计软件的核心引擎。作为SolidWorks等…...

RK3588嵌入式Linux开发实战:uboot任意键中断autoboot功能实现

1. 为什么需要任意键中断autoboot功能 在嵌入式Linux开发中&#xff0c;uboot作为系统启动的"引路人"&#xff0c;承担着硬件初始化、内核加载等重要任务。RK3588这类高性能处理器在启动时&#xff0c;默认会进入autoboot倒计时流程。这个设计本意是好的——当系统正…...

零基础部署Fun-ASR语音识别:支持GPU/CPU/MPS,开箱即用无需配置

零基础部署Fun-ASR语音识别&#xff1a;支持GPU/CPU/MPS&#xff0c;开箱即用无需配置 1. 为什么选择Fun-ASR&#xff1f; 语音识别技术已经成为现代办公和内容创作的重要工具&#xff0c;但传统解决方案往往面临三大痛点&#xff1a;部署复杂、准确率不足、依赖云端服务。Fu…...

贪心-摆动序列、不重叠字串数量

Ref 贪心B站搜索-折半搜索 分发饼干 class Solution { public:int findContentChildren(vector<int>& g, vector<int>& s) {sort(g.begin(),g.end());sort(s.begin(),s.end());int cnt0;for(int i0,j0;i<g.size()&&j<s.size();){if(s[j]&…...

nomic-embed-text-v2-moe保姆级教程:Gradio自定义CSS主题与响应式布局

nomic-embed-text-v2-moe保姆级教程&#xff1a;Gradio自定义CSS主题与响应式布局 1. 从零开始&#xff1a;认识nomic-embed-text-v2-moe 如果你正在寻找一个既强大又好用的文本嵌入模型&#xff0c;特别是需要处理多语言内容&#xff0c;那么nomic-embed-text-v2-moe绝对值得…...

别再只会setValue了!Qt进度条QProgressBar/QProgressDialog的5个实战技巧与避坑指南

别再只会setValue了&#xff01;Qt进度条QProgressBar/QProgressDialog的5个实战技巧与避坑指南 在开发文件管理器、下载工具或数据处理软件时&#xff0c;进度条往往是用户最直观的体验指标之一。一个"聪明"的进度条不仅能准确反映任务状态&#xff0c;还能提升用户…...

LangGraph实战:从零构建并部署一个多功能智能体

1. LangGraph框架概述&#xff1a;新一代智能体开发范式 在人工智能应用开发领域&#xff0c;智能体&#xff08;Agent&#xff09;技术正经历着从简单问答到复杂任务执行的进化。LangGraph作为LangChain生态中的新一代开发框架&#xff0c;彻底改变了传统链式结构的局限性。我…...

嵌入式Linux开发必备远程连接工具详解

1. 嵌入式Linux开发常用远程连接工具技术解析1.1 远程连接工具在嵌入式开发中的重要性嵌入式Linux开发过程中&#xff0c;开发人员经常需要远程访问目标设备进行调试、文件传输或系统监控。由于嵌入式设备通常资源有限且缺乏本地交互界面&#xff0c;远程连接工具成为开发流程中…...

手把手教你用XCVU3P和FMC+接口搭建高性能PCIe载板(附原理图下载)

基于XCVU3P与FMC的高性能PCIe载板开发实战指南 在当今高速数据处理领域&#xff0c;FPGA因其并行计算能力和可重构特性成为关键器件。Xilinx UltraScale系列的XCVU3P芯片配合FMC扩展接口&#xff0c;为开发者提供了强大的硬件加速平台。本文将深入解析如何从零开始构建一个支持…...

Debugging torch.distributed.DistBackendError: NCCL Communicator Setup and ncclUniqueId Retrieval Iss

1. 理解NCCL通信错误的核心问题 当你看到torch.distributed.DistBackendError: [2] is setting up NCCL communicator and retrieving ncclUniqueId这个错误时&#xff0c;本质上是在说GPU之间的"对讲机"无法正常建立连接。想象一下你正在组织一场多房间的线上会议&…...