当前位置: 首页 > news >正文

使用 Redis Streams 实现高性能消息队列

1. 引言

在后端开发中,消息队列是一个常见的组件,主要用于解耦系统、提高吞吐量以及实现异步处理。常见的消息队列包括 Kafka、RabbitMQ 以及 ActiveMQ,但 Redis Streams 作为 Redis 5.0 引入的新特性,也提供了一种高效、轻量的消息队列解决方案。

本文将深入探讨 Redis Streams 的核心概念,并演示如何在后端服务中使用 Redis Streams 实现一个高性能的消息队列。


2. Redis Streams 基本概念

Redis Streams 是 Redis 提供的流数据结构,允许存储和消费有序的数据流。它的主要特点包括:

  • 持久化存储:不同于 Pub/Sub 仅支持瞬时消息,Streams 支持持久化存储。

  • 消费分组(Consumer Groups):支持多个消费者消费不同的消息,提高并行能力。

  • 自动消息确认(Acknowledgment):支持消费确认机制,保证消息可靠性。

  • 阻塞读取(Blocking Reads):可以使用 XREADXREADGROUP 进行阻塞式消费,提高实时性。

Redis Streams 的数据结构类似于日志系统,每条消息都带有唯一的 ID 及对应的数据字段,如:

XADD mystream * field1 value1 field2 value2

上面的命令将 field1:value1field2:value2 存入 mystream 流中,* 让 Redis 自动生成 ID。


3. Redis Streams 基本操作

3.1 生产者:添加消息到 Stream

在 Redis 中,使用 XADD 命令向 Stream 发送消息,例如:

XADD my_stream * user_id 12345 action "login"

其中,my_stream 是流名称,* 表示自动生成 ID,user_idaction 代表存储的数据。

3.2 消费者:读取 Stream 中的消息

使用 XRANGE 读取 Stream 消息:

XRANGE my_stream - +

-+ 表示从头到尾读取所有消息。

3.3 组消费模式(Consumer Groups)

创建消费组:

XGROUP CREATE my_stream my_group 0 MKSTREAM

添加消费者并读取数据:

XREADGROUP GROUP my_group consumer1 COUNT 10 STREAMS my_stream >

确认消息已被处理:

XACK my_stream my_group 1681956776310-0

删除已确认的消息(减少存储占用):

XDEL my_stream 1681956776310-0

4. Redis Streams 在后端开发中的应用

4.1 场景 1:用户行为日志存储

应用 Redis Streams 记录用户行为,如登录、点击、浏览等,后台可实时分析用户数据:

  • 生产者:前端或业务逻辑向 user_logs 追加用户行为数据。

  • 消费者:后端消费日志,存入数据库或进行实时分析。

4.2 场景 2:任务队列

Redis Streams 适合作为任务队列,将任务推送到 Stream,多个 Worker 并发消费,提高处理能力。

  • 生产者:任务生成器将任务推送到 task_queue

  • 消费者:多个 Worker 消费任务并处理。

  • 优势:相比传统队列,Redis Streams 可回溯未处理的任务,确保任务不会丢失。


5. Redis Streams vs 传统消息队列

特性Redis StreamsKafkaRabbitMQ
消息持久化
消息确认机制
并行消费
去重功能
性能超高

从表中可以看出,Redis Streams 适用于轻量级消息队列需求,如日志收集、任务队列等,而 Kafka 适用于高吞吐量场景。


6. 示例代码:基于 Python 的 Redis Streams 生产者 & 消费者

安装 Redis-Py 依赖

pip install redis

生产者(Producer)

import redisr = redis.Redis(host='localhost', port=6379, decode_responses=True)# 发送消息
def send_message():r.xadd('task_queue', {'task_id': '123', 'action': 'process_data'})print("Message sent!")send_message()

消费者(Consumer)

import redisdef consume_messages():r = redis.Redis(host='localhost', port=6379, decode_responses=True)while True:messages = r.xread({'task_queue': '$'}, count=1, block=1000)for stream, msgs in messages:for msg_id, data in msgs:print(f"Processing {data}")r.xack('task_queue', 'task_group', msg_id)consume_messages()

7. 总结

Redis Streams 作为 Redis 5.0 引入的新功能,在高性能消息队列场景下表现出色。相比 Kafka 和 RabbitMQ,Redis Streams 适用于中小型业务场景,如日志收集、任务队列等,同时具备持久化存储、消费分组及确认机制。

如果你的项目已经使用 Redis,并且有消息队列需求,Redis Streams 可能是一个非常合适的选择。


8. 参考资料

  • Redis 官方文档 - Streams

  • Redis Streams vs Kafka


希望这篇文章能帮你快速掌握 Redis Streams 并在实际项目中应用!🎯

相关文章:

使用 Redis Streams 实现高性能消息队列

1. 引言 在后端开发中,消息队列是一个常见的组件,主要用于解耦系统、提高吞吐量以及实现异步处理。常见的消息队列包括 Kafka、RabbitMQ 以及 ActiveMQ,但 Redis Streams 作为 Redis 5.0 引入的新特性,也提供了一种高效、轻量的消…...

深度学习|表示学习|卷积神经网络|DeconvNet是什么?|18

如是我闻: DeconvNet(反卷积网络)是一种可视化 CNN(卷积神经网络)内部特征的方法,用于理解 CNN 是如何提取图像特征的。这个方法由 Zeiler & Fergus(2013) 提出,目的…...

(优先级队列(堆)) 【本节目标】 1. 掌握堆的概念及实现 2. 掌握 PriorityQueue 的使用

优先级队列(堆) 1. 优先级队列1.1 概念 2. 优先级队列的模拟实现2.1 堆的概念2.2 堆的存储方式2.3 堆的创建2.3.1 堆向下调整2.3.2 堆的创建2.3.3 建堆的时间复杂度 【本节目标】 掌握堆的概念及实现掌握 PriorityQueue 的使用 1. 优先级队列 1.1 概念…...

优化数据库结构

MySQL学习大纲 一个好的数据库设计方案对于数据库的性能尝尝会起到事倍功半的效果,合理的数据库结构不仅使数据库占用更小的磁盘空间,而且使查询速度更快。数据库结构的设计需要考虑数据冗余、查询和更新速度、字段的数据类型是否合理等多方面的内容&…...

密云生活的初体验

【】在《岁末随笔之碎碎念》里,我通告了自己搬新家的事情。乙巳年开始,我慢慢与大家分享自己买房装修以及在新家的居住体验等情况。 跳过买房装修的内容,今天先说说这三个月的生活体验。 【白河】 潮白河是海河水系五大河之一,贯穿…...

图像分类与目标检测算法

在计算机视觉领域,图像分类与目标检测是两项至关重要的技术。它们通过对图像进行深入解析和理解,为各种应用场景提供了强大的支持。本文将详细介绍这两项技术的算法原理、技术进展以及当前的落地应用。 一、图像分类算法 图像分类是指将输入的图像划分为…...

计算机网络——流量控制

流量控制的基本方法是确保发送方不会以超过接收方处理能力的速度发送数据包。 通常的做法是接收方会向发送方提供某种反馈,如: (1)停止&等待 在任何时候只有一个数据包在传输,发送方发送一个数据包,…...

体验 DeepSeek 多模态大模型 Janus-Pro-7B

含有图片的链接: https://mp.weixin.qq.com/s/i6kuVcGU1CUMYRPDM-bKog?token2020918682&langzh_CN 继上篇文章下载了 Janus-Pro-7B 后,准备本地运行时发现由于电脑配置配置太低(显存小于24G),无法运行&#xff0…...

使用mockttp库模拟HTTP服务器和客户端进行单元测试

简介 mockttp 是一个用于在 Node.js 中模拟 HTTP 服务器和客户端的库。它可以帮助我们进行单元测试和集成测试,而不需要实际发送 HTTP 请求。 安装 npm install mockttp types/mockttp模拟http服务测试 首先导入并创建一个本地服务器实例 import { getLocal } …...

解决每次打开终端都需要source ~/.bashrc的问题(记录)

新服务器或者电脑通常需要设置一些环境变量,例如新电脑安装了Anaconda等软件,在配置环境变量后发现每次都需要重新source,非常麻烦,执行下面添加脚本实现一劳永逸 vim .bash_profile# .bash_profileif [ -f ~/.bashrc ]; then. ~…...

UE5 蓝图学习计划 - Day 14:搭建基础游戏场景

在上一节中,我们 确定了游戏类型,并完成了 项目搭建、角色蓝图的基础设置(移动)。今天,我们将进一步完善 游戏场景,搭建 地形、墙壁、机关、触发器 等基础元素,并添加角色跳跃功能,为…...

C++常用拷贝和替换算法

算法简介: copy // 容器内指定的元素拷贝到另一容器replace // 将容器内指定范围的旧元素改为新元素replace_if // 容器内指定范围满足条件的元素替换为新元素swap //互换两个容器的元素 1. copy 功能描述: 将容器内指定范围的数据拷贝到另一容器中函…...

取消和确认按钮没有显示的问题

取消和确认按钮没有显示的问题<template #footer> <template #footer> <!-- 使用插槽名称 #footer --> <span class"dialog-footer"> <el-button click"dialogVisible false">取消</el-button> …...

Python安居客二手小区数据爬取(2025年)

目录 2025年安居客二手小区数据爬取观察目标网页观察详情页数据准备工作&#xff1a;安装装备就像打游戏代码详解&#xff1a;每行代码都是你的小兵完整代码大放送爬取结果 2025年安居客二手小区数据爬取 这段时间需要爬取安居客二手小区数据&#xff0c;看了一下相关教程基本…...

Java/Kotlin HashMap 等集合引发 ConcurrentModificationException

在对一些非并发集合同时进行读写的时候&#xff0c;会抛出 ConcurrentModificationException 异常产生示例 示例一&#xff08;单线程&#xff09;&#xff1a; 遍历集合时候去修改 抛出 ConcurrentModificationException 的主要原因是当你在遍历一个集合&#xff08;如 Map…...

【Day31 LeetCode】动态规划DP Ⅳ

一、动态规划DP Ⅳ 1、最后一块石头的重量II 1049 这题有点像脑筋急转弯&#xff0c;尽量让石头分成重量相同的两堆&#xff08;尽可能相同&#xff09;&#xff0c;相撞之后剩下的石头就是最小的。明白这一点&#xff0c;就与上一篇博客里的划分等和数组很相似。划分等和数组…...

Unity 2D实战小游戏开发跳跳鸟 - 记录显示最高分

上一篇文章中我们实现了游戏的开始界面,在开始界面中有一个最高分数的UI,本文将接着实现记录最高分数以及在开始界面中显示最高分数的功能。 添加跳跳鸟死亡事件 要记录最高分,则需要在跳跳鸟死亡时去进行判断当前的分数是否是最高分,如果是最高分则进行记录,如果低于之前…...

Ollama AI 开发助手完全指南:从入门到实践

本文将详细介绍如何使用 Ollama AI 开发助手来提升开发效率,包括环境搭建、模型选择、最佳实践等全方位内容。 © ivwdcwso (ID: u012172506) 目录 基础环境配置模型选择与使用开发工具集成实践应用场景性能优化与注意事项最佳实践总结一、基础环境配置 1.1 系统要求 在…...

Racecar Gym

Racecar Gym 参考&#xff1a;https://github.com/axelbr/racecar_gym/blob/master/README.md 1. 项目介绍 Racecar Gym 是一个基于 PyBullet 物理引擎的 reinforcement learning (RL) 训练环境&#xff0c;模拟微型 F1Tenth 竞速赛车。它兼容 Gym API 和 PettingZoo API&am…...

代码随想录36 动态规划

leetcode 343.整数拆分 给定一个正整数 n &#xff0c;将其拆分为 k 个 正整数 的和&#xff08; k > 2 &#xff09;&#xff0c;并使这些整数的乘积最大化。 返回 你可以获得的最大乘积 。 示例 1: 输入: n 2 输出: 1 解释: 2 1 1, 1 1 1。 示例 2: 输入: n 1…...

SkyWalking 10.2.0 SWCK 配置过程

SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外&#xff0c;K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案&#xff0c;全安装在K8S群集中。 具体可参…...

[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?

论文网址&#xff1a;pdf 英文是纯手打的&#xff01;论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误&#xff0c;若有发现欢迎评论指正&#xff01;文章偏向于笔记&#xff0c;谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...

跨链模式:多链互操作架构与性能扩展方案

跨链模式&#xff1a;多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈&#xff1a;模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展&#xff08;H2Cross架构&#xff09;&#xff1a; 适配层&#xf…...

Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

HBuilderX安装(uni-app和小程序开发)

下载HBuilderX 访问官方网站&#xff1a;https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本&#xff1a; Windows版&#xff08;推荐下载标准版&#xff09; Windows系统安装步骤 运行安装程序&#xff1a; 双击下载的.exe安装文件 如果出现安全提示&…...

WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)

一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解&#xff0c;适合用作学习或写简历项目背景说明。 &#x1f9e0; 一、概念简介&#xff1a;Solidity 合约开发 Solidity 是一种专门为 以太坊&#xff08;Ethereum&#xff09;平台编写智能合约的高级编…...

【分享】推荐一些办公小工具

1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由&#xff1a;大部分的转换软件需要收费&#xff0c;要么功能不齐全&#xff0c;而开会员又用不了几次浪费钱&#xff0c;借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...

MySQL:分区的基本使用

目录 一、什么是分区二、有什么作用三、分类四、创建分区五、删除分区 一、什么是分区 MySQL 分区&#xff08;Partitioning&#xff09;是一种将单张表的数据逻辑上拆分成多个物理部分的技术。这些物理部分&#xff08;分区&#xff09;可以独立存储、管理和优化&#xff0c;…...

适应性Java用于现代 API:REST、GraphQL 和事件驱动

在快速发展的软件开发领域&#xff0c;REST、GraphQL 和事件驱动架构等新的 API 标准对于构建可扩展、高效的系统至关重要。Java 在现代 API 方面以其在企业应用中的稳定性而闻名&#xff0c;不断适应这些现代范式的需求。随着不断发展的生态系统&#xff0c;Java 在现代 API 方…...

FFmpeg avformat_open_input函数分析

函数内部的总体流程如下&#xff1a; avformat_open_input 精简后的代码如下&#xff1a; int avformat_open_input(AVFormatContext **ps, const char *filename,ff_const59 AVInputFormat *fmt, AVDictionary **options) {AVFormatContext *s *ps;int i, ret 0;AVDictio…...