Redis队列与Pub/Sub方案全解析:原理、对比与实战性能测试
一、为什么选择Redis实现消息队列?
Redis凭借其内存级操作(微秒级响应)、丰富的数据结构以及持久化能力,成为构建高性能消息队列的热门选择。相比传统消息队列(如Kafka/RabbitMQ),Redis在以下场景表现突出:
• 轻量级任务调度:毫秒级任务分发
• 实时数据处理:日志采集、事件驱动架构
• 高并发队列:电商秒杀、API限流
• 实时广播:即时通知、实时数据推送
二、主流实现方案对比
方案对比维度
特性 | List结构队列 | Stream类型队列 | Sorted Set队列 | Pub/Sub |
---|---|---|---|---|
消息持久化 | ❌(依赖Redis配置,配置RDB或AOF后可以持久化) | ✔️(内置持久化) | ❌(依赖Redis配置) | ❌(纯内存) |
消息广播 | ❌ | ❌ | ❌ | ✔️(一对多) |
离线消息 | ❌ | ✔️(存储未ACK消息) | ❌ | ❌(立即丢弃) |
订阅模式 | ❌ | ❌ | ❌ | ✔️(频道/模式匹配) |
典型延迟 | 0.8ms | 1.2ms | 2.7ms | 0.2ms |
适用场景 | 任务队列 | 可靠消息处理 | 定时任务 | 实时通知 |
三、核心方案实现详解
方案1:List结构队列(简单队列)
核心原理
// 生产者
jedis.lpush("task_queue", taskJson);// 消费者(阻塞模式)
List<String> result = jedis.brpop(0, "task_queue");
String task = result.get(1);
持久化机制
• RDB持久化:定时生成内存快照(需配置save
参数)
• AOF持久化:记录所有写操作命令(需配置appendonly yes
)
• 验证方法:
# 查看当前持久化配置
CONFIG GET save
CONFIG GET appendonly
特性分析
• 优点:实现简单,性能极高(TPS 10万+)
• 缺点:无ACK机制,持久化依赖Redis配置
• 适用场景:日志采集、非关键任务队列
方案2:Stream类型队列(企业级队列)
核心原理
// 生产者
String messageId = jedis.xadd("order_stream", "*", "status", "created","amount", "99.9");// 消费者组消费
Map.Entry<String, String> entry = jedis.xreadGroup("order_group", "consumer1", XReadGroupParams.xReadGroupParams().count(1).streamOffset("order_stream", ">"),"order_stream"
).get(0);// 确认消息
jedis.xack("order_stream", "order_group", entry.getKey());
核心优势
• 消费者组:支持多消费者并行处理
• 消息确认:ACK机制保证消息不丢失
• 消息回溯:可查看历史消息(7天默认)
方案3:Sorted Set延迟队列
核心原理
// 投递延迟任务(延迟30分钟)
long delaySeconds = 1800;
jedis.zadd("delay_queue", System.currentTimeMillis() + delaySeconds*1000, taskJson);// 轮询处理
Set<String> tasks = jedis.zrangeByScore("delay_queue", 0, System.currentTimeMillis()
);
应用场景
• 订单超时处理
• 支付回调重试
• 定时任务调度
方案4:Pub/Sub实时消息系统
核心原理
// 发布者
jedis.publish("stock_updates", JSON.toJSONString(stockData));// 订阅者
JedisPubSub subscriber = new JedisPubSub() {@Overridepublic void onMessage(String channel, String message) {handleRealTimeUpdate(message);}
};
jedis.subscribe(subscriber, "stock_updates");
核心特性
• 广播模式:一对多实时消息推送
• 模式匹配:支持通配符订阅(如news.*
)
• 低延迟:微秒级消息传递
四、Java实战代码示例
4.1 List队列完整实现
public class ListQueue {private static final String KEY = "list_queue";private Jedis jedis;public ListQueue() {this.jedis = new Jedis("localhost", 6379);}// 生产者public void produce(String task) {jedis.lpush(KEY, task);}// 消费者(阻塞模式)public String consume() {while (true) {List<String> result = jedis.brpop(0, KEY);if (result != null && !result.isEmpty()) {return result.get(1);}}}
}
4.2 Stream队列消费者组
public class StreamQueue {private static final String STREAM_KEY = "stream_queue";private static final String GROUP_NAME = "order_group";private Jedis jedis;public StreamQueue() {this.jedis = new Jedis("localhost", 6379);createConsumerGroup();}private void createConsumerGroup() {try {jedis.xgroupCreate(STREAM_KEY, GROUP_NAME, "0");} catch (Exception e) {// 组已存在}}// 消费者处理public void processMessages() {while (true) {Map.Entry<String, String> entry = jedis.xreadGroup(GROUP_NAME, "consumer1", XReadGroupParams.xReadGroupParams().count(1).streamOffset(STREAM_KEY, ">"),STREAM_KEY).get(0);String msgId = entry.getKey();Map<String, String> fields = EntryToMap(entry.getValue());processTask(fields);jedis.xack(STREAM_KEY, GROUP_NAME, msgId);}}private Map<String, String> EntryToMap(String value) {// 解析Stream消息格式return Arrays.stream(value.split(",")).map(entry -> entry.split("=")).collect(Collectors.toMap(a -> a[0], a -> a[1]));}
}
4.3 Pub/Sub实时通知
public class PubSubDemo {public static void main(String[] args) {// 发布者线程new Thread(() -> {try (Jedis jedis = new Jedis("localhost")) {for (int i = 0; i < 1000; i++) {jedis.publish("realtime_alerts", String.format("{\"event\":\"alert\",\"id\":%d}", i));Thread.sleep(100);}}}).start();// 订阅者线程new Thread(() -> {Jedis jedis = new Jedis("localhost");jedis.subscribe(new JedisPubSub() {@Overridepublic void onMessage(String channel, String message) {System.out.printf("[实时通知] %s: %s%n", channel, message);}}, "realtime_alerts");}).start();}
}
五、性能测试对比
测试环境
• 硬件:4核8G CentOS 7.9
• Redis版本:6.2.6(混合持久化)
• 客户端:Jedis 4.2.3
• 并发量:500线程
测试结果(单位:TPS)
方案 | 吞吐量 | 平均延迟 | CPU占用 | 消息可靠性 |
---|---|---|---|---|
List队列(无持久化) | 122,300 | 0.8ms | 38% | ❌(重启丢失) |
List队列(AOF) | 98,500 | 1.5ms | 45% | ✔️(AOF每秒同步) |
Stream队列 | 85,600 | 1.2ms | 45% | ✔️(ACK机制) |
Sorted Set队列 | 38,400 | 2.7ms | 29% | ✔️(定时轮询) |
Pub/Sub | 182,450 | 0.4ms | 32% | ❌(离线丢失) |
六、生产环境配置建议
- List队列持久化配置
# Redis.conf 配置示例
save 900 1 # 900秒内至少1次修改触发保存
save 300 10 # 300秒内至少10次修改
save 60 10000 # 60秒内至少10000次修改
appendonly yes
appendfsync everysec # 每秒同步(性能与安全平衡)
- 混合持久化方案
// 关键业务数据双写保障
jedis.lpush("critical_task", taskJson); // 写List
jedis.xadd("critical_stream", "*", "data", taskJson); // 写Stream
七、选型决策树
八、关键注意事项
- List队列持久化陷阱
• 大Key风险:单List超过1GB会显著降低性能
• 持久化阻塞:AOF重写期间可能延迟飙升
• 解决方案:
// 拆分大List为多个子List
String listKey = "task_list_" + (taskId % 10);
jedis.lpush(listKey, taskJson);
- Stream消息过期策略
# 自动清理旧消息(保留最近1000条)
XTRIM order_stream MAXLEN ~ 1000
通过本文的完整分析,开发者可以明确:
• List队列的持久化能力完全依赖Redis服务端配置,需显式启用AOF/RDB
• Stream队列是唯一内置可靠持久化的方案,适合核心业务场景
• Pub/Sub仅适用于实时广播场景,需配合其他方案实现消息持久化
生产环境建议采用混合架构:
• 用Pub/Sub处理实时通知
• 用Stream处理关键业务数据
• 用List处理高吞吐量日志(需配置持久化)
• 用Sorted Set处理定时任务
相关文章:

Redis队列与Pub/Sub方案全解析:原理、对比与实战性能测试
一、为什么选择Redis实现消息队列? Redis凭借其内存级操作(微秒级响应)、丰富的数据结构以及持久化能力,成为构建高性能消息队列的热门选择。相比传统消息队列(如Kafka/RabbitMQ),Redis在以下场…...
深度估计中为什么需要已知相机基线(known camera baseline)?
在计算机视觉和立体视觉的上下文中,“已知相机基线”(known camera baseline)的解释 1. 相机基线的定义 相机基线是指两个相机中心之间的距离。在立体视觉系统中,通常有两个相机(或一个相机在不同位置拍摄两张图像&a…...
显卡、Cuda和pytorch兼容问题
这里写目录标题 驱动与CUDA版本兼容性问题1. **驱动与CUDA版本兼容性问题**2. **任务特性与硬件适配差异**3. **优化策略与框架配置差异**4. **散热与功耗限制**5. **数据传输与CPU瓶颈**排查建议总结 查询PyTorch中实际使用的CUDA版本**1. 查询PyTorch中实际使用的CUDA版本***…...
SseEmitter是什么
SseEmitter 是 Spring Framework 中用于实现 Server-Sent Events (SSE) 的一个类。SSE 是一种允许服务器向客户端推送实时更新的技术,特别适合需要从服务器到客户端的单向消息传递场景,如股票价格更新、社交媒体的新消息通知等。 Server-Sent Events (S…...

OBOO鸥柏丨AI数字人触摸屏查询触控人脸识别语音交互一体机上市
OBOO鸥柏丨AI数字人触摸屏查询触控人脸识别语音交互一体机上市分析 OBOO鸥柏品牌推出的AI数字人触摸屏查询触控人脸识别语音交互一体机,是其在智能交互设备领域的又一创新产品。该一体机整合了触摸屏查询、AI人脸识别、AI声源定位语音麦克风,触控交互以…...

第5天-python饼图绘制
一、基础饼图绘制(Matplotlib) 1. 环境准备 python 复制 下载 pip install matplotlib numpy 2. 基础饼图代码 python 复制 下载 import matplotlib.pyplot as plt# 数据准备 labels = [1, 2, 3, 4] sizes = [30, 25, 15, 30] # 各部分占比(总和建议100) colors…...

2023 睿抗机器人开发者大赛CAIP-编程技能赛-本科组(国赛) 解题报告 | 珂学家
前言 题解 2023 睿抗机器人开发者大赛CAIP-编程技能赛-本科组(国赛)。 vp了下,题目挺好的,难度也适中,但是彻底红温了。 第二题,题意不是那么清晰, M i n ( K 1 , K 2 ) Min(K_1, K_2) Min(K1,K2)容易求&#x…...

LabVIEW风机状态实时监测
在当今电子设备高度集成化的时代,设备散热成为关键问题。许多大型设备机箱常采用多个风机协同散热,确保系统稳定运行。一旦风机出现故障,若不能及时察觉,可能导致设备损坏,造成巨大损失。为满足对机箱内风机状态实时监…...

十一、面向对象底层逻辑-Dubbo过滤器Filter接口
一、引言:分布式系统中的可观测性与治理基石 在分布式服务调用链路中,如何在服务调用前后植入通用逻辑(如日志记录、权限校验、性能监控等),是构建可观测、可治理系统的关键需求。Dubbo通过Filter接口实现了面向切面编…...
双检锁(Double-Checked Locking)单例模式
在项目中使用双检锁(Double-Checked Locking)单例模式来管理 JSON 格式化处理对象(如 ObjectMapper 在 Jackson 库中,或 JsonParser 在 Gson 库中)是一种常见的做法。这种模式确保了对象只被创建一次,同时在…...

linux安装nginx和前端部署vue项目
1、打包前端项目 npm run build 执行完后会在根目录下生成一个dist文件夹,这个dist文件夹就是我们后面要部署到nginx的东西。 2、将dist文件夹上传到服务器中 自己建一个目录,上传即可(尽量不要在root目录下,可能涉及权限问题…...
打破次元壁,VR 气象站开启气象学习新姿势
在教育领域,VR 气象站同样发挥着巨大的作用,为气象教学带来了全新的模式,打破了传统教学的次元壁,让学生们以全新的姿势学习气象知识。 在传统的气象教学中,学生们主要通过课本、图片和老师的讲解来学习气象知识。这…...

软件设计师“数据流图”真题考点分析——求三连
数据流图考点分析 1. 考点分值占比与趋势分析 综合知识题分值统计表 年份考题数量分值分值占比考察重点2018111.33%数据流图基本元素2019222.67%数据流图绘制原则2020111.33%数据流图与控制流图的区别2021334.00%数据字典与数据流图的关系2022222.67%分层数据流图的分解原则…...

基于R语言的贝叶斯网络模型实践技术应用:开启科研新视角
在现代科研领域,变量间的因果关系推断是生态学、环境科学、医学等多学科研究的核心问题。然而,传统的统计学方法往往只能揭示变量间的相关关系,而非因果关系。贝叶斯网络作为一种结合图论与统计学理论的新型模型,不仅能够统合多种…...
用 VS Code / PyCharm 编写你的第一个 Python 程序
用ChatGPT做软件测试 编写你的第一个 Python 程序——不只是“Hello, World”,而是构建认知、习惯与未来的起点 “第一行代码,是一个开发者认知世界的方式。” 编程的入门,不只是运行一个字符串输出,更是开始用计算机思维来理解、…...

【Git】远程操作
Git 是一个分布式版本控制系统 可以简单理解为,每个人的电脑上都是一个完整的版本库,这样在工作时,就不需要联网 了,因为版本库就在自己的电脑上。 因此, 多个人协作的方式,譬如说甲在自己的电脑上改了文件…...
低代码AI开发新趋势:Dify平台化开发实战
在人工智能快速发展的今天,AI应用的开发方式也在不断演变。从传统的手写代码到如今的低代码甚至零代码开发,技术的进步让更多的非专业开发者也能轻松上手。本文将带你走进Dify平台化开发的世界,探索如何通过这一强大的低代码AI开发平台&#…...

DeepSpeed简介及加速模型训练
DeepSpeed是由微软开发的开源深度学习优化框架,专注于大规模模型的高效训练与推理。其核心目标是通过系统级优化技术降低显存占用、提升计算效率,并支持千亿级参数的模型训练。 官网链接:deepspeed 训练代码下载:git代码 一、De…...
网络安全面试题(一)
文章目录 一、基础概念与模型1. 什么是通信协议?列举三种常见的网络通信模型?2. 解释OSI七层模型及各层功能3. TCP/IP四层模型与OSI模型的对应关系是什么?4. 五层协议体系结构与TCP/IP模型的区别?5. 什么是面向连接与非面向连接的服务&…...
Linux 内核探秘:从零构建 GPIO 设备驱动程序实战指南
在嵌入式系统开发领域,GPIO(通用输入 / 输出)作为硬件与软件交互的桥梁,是实现设备控制与数据采集的基础。编写高效、稳定的 GPIO 设备驱动程序,对于发挥硬件性能至关重要。本文将深入剖析 Linux 内核中 GPIO 驱动开发…...

openlayer:10点击地图上某些省份利用Overlay实现提示省份名称
实现点击地图上的省份,在点击经纬度坐标位置附近利用Overlay实现提示框提示相关省份名称。本文介绍了如何通过OpenLayers库实现点击地图上的省份,并在点击的经纬度坐标位置附近显示提示框,提示相关省份名称。首先,定义了两个全局变…...

upload-labs通关笔记-第13关 文件上传之白名单POST法
目录 一、白名单过滤 二、%00截断 1.截断原理 2、截断条件 (1)PHP版本 < 5.3.4 (2)magic_quotes_gpc配置为Off (3)代码逻辑存在缺陷 三、源码分析 1、代码审计 (1)文件…...

数据库健康监测器(BHM)实战:如何通过 HTML 报告识别潜在问题
在数据库运维中,健康监测是保障系统稳定性与性能的关键环节。通过 HTML 报告,开发者可以直观查看数据库的运行状态、资源使用情况与潜在风险。 本文将围绕 数据库健康监测器(Database Health Monitor, BHM) 的核心功能展开分析,结合 Prometheus + Grafana + MySQL Export…...
C++(20): 文件输入输出库 —— <fstream>
目录 一、 的核心功能 二、核心类及功能 三、核心操作示例 1. 文本文件写入(ofstream) 2. 文本文件读取(ifstream) 3. 二进制文件操作(fstream) 四、文件打开模式 五、文件指针操作 六、错误处理技巧…...
使用Starrocks制作拉链表
5月1日向ods_order_info插入3条数据: CREATE TABLE ods_order_info(dt string,id string COMMENT 订单编号,total_amount decimal(10,2) COMMENT 订单金额 ) PRIMARY KEY(dt, id) PARTITION BY (dt) DISTRIBUTED BY HASH(id) PROPERTIES ( "replication_num&q…...

Oracle 11g 单实例使用+asm修改主机名导致ORA-29701 故障分析
解决 把服务器名修改为原来的,重启服务器。 故障 建表空间失败。 分析 查看告警日志 ORA-1119 signalled during: create tablespace splex datafile ‘DATA’ size 2000M… Tue May 20 18:04:28 2025 create tablespace splex datafile ‘DATA/option/dataf…...
Spring Boot接口通用返回值设计与实现最佳实践
一、核心返回值模型设计(增强版) package com.chat.common;import com.chat.util.I18nUtil; import com.chat.util.TraceUtil; import lombok.AllArgsConstructor; import lombok.Data; import lombok.Getter;import java.io.Serializable;/*** 功能: 通…...
DeepSeek 赋能军事:重塑现代战争形态的科技密码
目录 一、引言:AI 浪潮下的军事变革与 DeepSeek 崛起二、DeepSeek 技术原理与特性剖析2.1 核心技术架构2.2 独特优势 三、DeepSeek 在军事侦察中的应用3.1 海量数据快速处理3.2 精准目标识别追踪3.3 预测潜在威胁 四、DeepSeek 在军事指挥决策中的应用4.1 战场态势实…...
day09-新热文章-实时计算
1. 实时计算与定时计算的区别 定时计算:基于固定时间间隔(如每天/小时)处理全量数据,适用于对实时性要求不高的场景。实时计算:持续处理无界数据流,结果实时输出,适用于高实时性场景࿰…...
Elasticsearch面试题带答案
Elasticsearch面试题带答案 Elasticsearch面试题及答案【最新版】Elasticsearch高级面试题大全(2025版),发现网上很多Elasticsearch面试题及答案整理都没有答案,所以花了很长时间搜集,本套Elasticsearch面试题大全,Elasticsearch面试题大汇总,有大量经典的Elasticsearch面…...