当前位置: 首页 > news >正文

详解 Redis 队列 实现

Redis 是一个高性能的键值存储系统,它的多种数据结构使其能够以不同方式实现队列,包括普通队列、延时队列和异步队列的介绍和示例。

介绍

Redis 的 List 数据结构可以用来实现普通的队列。
生产者使用 LPUSH 或 RPUSH 命令将消息添加到列表的头部或尾部,消费者使用 LPOP 或 RPOP 命令从列表的头部或尾部取出消息。

这种方式简单轻量,但缺少一些高级特性,如消息重试、持久化等 。

延时队列

延时队列可以通过 Redis 的 Sorted Set 数据结构来实现。
消息的到期时间作为分数(score),消息内容作为成员(member)。使用 ZADD 命令添加消息,并通过 ZRANGEBYSCORE 命令获取到期的消息进行消费。

这种方式可以保证消息的有序性,并且处理效率非常高 。

异步队列

Redis 的发布/订阅(pub/sub)模式可以实现异步队列。
生产者使用 PUBLISH 命令发送消息到一个频道,消费者使用 SUBSCRIBE 命令订阅频道来接收消息。

这种方式可以支持消息的广播,但消息无法持久化,且可能会出现消息丢失的情况。

Redis Stream 的使用

Redis 5.0 版本引入了 Stream 数据结构,它是一个持久化的、支持消费者组的消息队列。
使用 XADD 命令添加消息,XREAD 或 XREADGROUP 命令读取消息,并通过 XACK 命令确认消息已被处理。

这种方式支持消息的持久化、消费者组的概念以及消息确认机制 。

示例

RedisTemplate来实现队列操作

在Spring Boot中配置和使用RedisTemplate来实现队列操作主要涉及以下几个步骤:

添加依赖:

确保你的pom.xml文件中已经添加了Spring Boot的Redis依赖。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

配置Redis:

在application.properties或application.yml中配置Redis服务器的连接信息。
application.yml

spring:redis:host: localhostport: 6379

配置RedisTemplate:

创建一个配置类,配置RedisTemplate的序列化器等。

@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 使用StringRedisSerializer序列化keytemplate.setKeySerializer(new StringRedisSerializer());// 使用Jackson2JsonRedisSerializer序列化valuetemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));// 设置hash的key和value序列化方式template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));// 启用事务template.setEnableTransactionSupport(true);return template;}
}

使用RedisTemplate进行队列操作:

注入RedisTemplate并使用它进行队列操作。

@Service
public class RedisQueueService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;public void enqueue(String queueName, Object value) {redisTemplate.opsForList().rightPush(queueName, value);}public Object dequeue(String queueName) {return redisTemplate.opsForList().leftPop(queueName);}// 其他队列操作...
}

这是一个基本的配置和使用RedisTemplate实现队列操作的流程。
根据你的具体需求,可能还需要配置连接池、密码认证、集群支持等高级特性。此外,对于延时队列和异步队列,可能需要使用Redisson或其他高级特性和库。

实现异步队列

在Spring Boot中,使用Redis实现异步队列通常可以通过发布/订阅模式或列表(List)数据结构来完成。以下是两种实现方式的示例:

使用发布/订阅模式实现异步队列

发布/订阅模式是一种消息通信模式,其中消息生产者(发布者)不会将消息直接发送到特定的接收者(订阅者),而是将消息发布到一个主题。

对这些消息感兴趣的接收者可以订阅这个主题,从而异步接收消息。

配置RedisTemplate:

首先,在Spring Boot应用中配置RedisTemplate,以便于操作Redis。

创建消息生产者:

使用RedisTemplate的convertAndSend方法发布消息到指定的频道。

@Service
public class MessageProducer {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void sendMessage(String channel, String message) {redisTemplate.convertAndSend(channel, message);}
}

创建消息消费者:

创建一个消息监听器,订阅频道并接收消息。

@Service
public class MessageListener {@RabbitListener(queues = "queue.name")public void receiveMessage(String message) {// 处理接收到的消息System.out.println("Received: " + message);}
}

使用列表(List)数据结构实现异步队列

列表是一种双向链表结构,可以作为队列使用。

生产者可以使用lpush或rpush将消息添加到列表的头部或尾部,消费者可以使用lpop或rpop从列表的头部或尾部取出消息。

配置RedisTemplate:

同样,首先需要配置RedisTemplate。

生产者添加消息到列表:

使用RedisTemplate的opsForList操作列表,将消息压入列表。

@Service
public class AsyncQueueProducer {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void pushMessage(String listKey, String message) {redisTemplate.opsForList().rightPush(listKey, message);}
}

消费者从列表中取出消息:

消费者可以从列表中弹出消息并处理。

@Service
public class AsyncQueueConsumer {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void consumeMessage(String listKey) {List<String> messages = redisTemplate.opsForList().range(listKey, 0, -1);for (String message : messages) {// 处理消息System.out.println("Processing: " + message);}}
}

在实际应用中,可以根据业务需求选择使用发布/订阅模式或列表数据结构来实现异步队列。
发布/订阅模式适用于消息广播的场景,而列表数据结构更适用于实现一个简单的任务队列

实现延时队列

在Spring Boot中使用Redis实现延时队列,可以通过Sorted Set数据结构来实现。以下是具体的实现步骤和示例代码:

步骤 1: 添加Redis依赖

首先,确保你的pom.xml文件中已经添加了Spring Boot的Redis依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

步骤 2: 配置Redis

在application.properties或application.yml中配置Redis服务器的连接信息:

application.properties

spring.redis.host=localhost
spring.redis.port=6379

步骤 3: 创建延时队列服务

创建一个服务类来封装延时队列的逻辑:

@Service
public class DelayedQueueService {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void enqueue(String queueName, String message, long delaySeconds) {double score = System.currentTimeMillis() + delaySeconds * 1000;redisTemplate.opsForZSet().add(queueName, message, score);}public String dequeue(String queueName) {Set<String> keys = redisTemplate.opsForZSet().rangeByScore(queueName, 0, System.currentTimeMillis());if (!keys.isEmpty()) {String message = keys.iterator().next();redisTemplate.opsForZSet().remove(queueName, message);return message;}return null;}
}

步骤 4: 使用延时队列

在业务逻辑中使用DelayedQueueService来添加和消费延时消息:

@RestController
public class DelayedQueueController {@Autowiredprivate DelayedQueueService delayedQueueService;@PostMapping("/delayed-enqueue")public ResponseEntity<?> enqueueDelayedMessage(@RequestParam String message, @RequestParam long delaySeconds) {String queueName = "delayedQueue";delayedQueueService.enqueue(queueName, message, delaySeconds);return ResponseEntity.ok("Message enqueued with delay " + delaySeconds + " seconds");}@GetMapping("/delayed-dequeue")public ResponseEntity<?> dequeueDelayedMessage() {String queueName = "delayedQueue";String message = delayedQueueService.dequeue(queueName);if (message != null) {return ResponseEntity.ok(message);} else {return ResponseEntity.noContent().build();}}
}

说明

enqueue方法将消息和其预定的延迟时间戳(当前时间 + 延迟时间)作为分数(score)添加到Sorted Set中。

dequeue方法检索分数小于或等于当前时间的所有消息,从Sorted Set中移除并返回第一个消息。

定时任务(如果有)可以用于定期处理到期的消息。

这个简单的实现提供了一个基础的延时队列功能,适用于需要异步处理但具有特定延迟时间的任务。对于更高级的消息队列需求,可能需要考虑使用专业的MQ系统。

Redis Stream实现持久化队列

添加依赖:

确保pom.xml文件中已添加Spring Boot的Redis依赖。

配置Redis:

在application.properties或application.yml中配置Redis服务器的连接信息。
创建消息生产者:使用RedisTemplate的opsForStream()方法添加消息到Stream中。

创建消费者组:

使用XGROUP CREATE命令为Stream创建消费者组,可以指定从哪个消息ID开始消费。

配置消费者:

实现StreamListener接口,编写接收消息的逻辑。

配置Stream监听器容器:

使用StreamMessageListenerContainer来管理消费者,监听特定Stream的消息,并分配给消费者处理。

示例代码如下:

@Service
public class MessageProducer {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void sendMessage(String streamKey, Map<String, String> message) {redisTemplate.opsForStream().add(streamKey, message);}
}
@Service
public class MessageConsumer implements StreamListener<String, ObjectRecord<String, String>> {@Overridepublic void onMessage(ObjectRecord<String, String> message) {// 处理接收到的消息}
}
@Configuration
public class RedisStreamConfig {@Beanpublic StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) {StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =new StreamMessageListenerContainer<>(connectionFactory, messageConsumer);container.start();return container;}
}

在上述代码中,MessageProducer用于发送消息到Redis
Stream,MessageConsumer实现了StreamListener接口来接收消息。
RedisStreamConfig配置了StreamMessageListenerContainer,它会启动并监听消息,将收到的消息分发给消费者处理。

注意,为了确保消息的可靠性,可以实现消息确认机制,在消息被成功处理后,通过XACK命令向Redis确认消息已被消费。同时,如果需要处理消息的持久化和回溯,Redis Stream提供了相应的命令来查询历史消息或未确认的消息 。

通过这种方式,Spring Boot应用可以利用Redis Stream构建一个高性能、持久化且支持消费者组的队列系统,适用于多种消息队列场景 。

相关文章:

详解 Redis 队列 实现

Redis 是一个高性能的键值存储系统&#xff0c;它的多种数据结构使其能够以不同方式实现队列&#xff0c;包括普通队列、延时队列和异步队列的介绍和示例。 介绍 Redis 的 List 数据结构可以用来实现普通的队列。 生产者使用 LPUSH 或 RPUSH 命令将消息添加到列表的头部或尾部…...

分析SQL的count(*)并优化

最近优化过几个慢查询接口的性能&#xff0c;总结了一些心得体会拿出来跟大家一起分享一下&#xff0c;希望对你会有所帮助。 我们使用的数据库是Mysql8&#xff0c;使用的存储引擎是Innodb。这次优化除了优化索引之外&#xff0c;更多的是在优化count(*)。 通常情况下&#…...

Java学习日记(day18)

一、软件的结构 C/S (Client - Server 客户端-服务器端) 典型应用&#xff1a;QQ软件 &#xff0c;飞秋&#xff0c;印象笔记。 特点&#xff1a; 必须下载特定的客户端程序。服务器端升级&#xff0c;客户端升级。 B/S &#xff08;Broswer -Server 浏览器端- 服务器端&a…...

Oracle(61)什么是外部表(External Table)?

外部表&#xff08;External Table&#xff09;是Oracle数据库中的一种特殊表类型&#xff0c;用于访问存储在外部文件系统中的数据&#xff0c;而不需要将数据实际加载到数据库内部。外部表的主要优势在于允许数据库用户在不移动或复制数据的情况下&#xff0c;直接查询和处理…...

物联网HMI/网关搭载ARM+CODESYS实现软PLC+HMI一体化

物联网HMI/网关搭载CODESYS实现软PLCHMI一体化 硬件&#xff1a;ARM平台&#xff0c;支持STM32/全志T3/RK3568/树莓派等平台 软件&#xff1a;CODESYS V3.5、JMobile Studio CODESYS是一款功能强大的PLC软件编程工具&#xff0c;它支持IEC61131-3标准IL、ST、FBD、LD、CFC、…...

Java中Stream流

Java中Stream流 Stream 使用flatMap处理嵌套集合: 有一个对象列表&#xff0c;每个对象又包含一个列表&#xff0c;可以使用flatMap来“展平”这个结构。 List<List<String>> listOfLists Arrays.asList(Arrays.asList("a", "b"),Arrays.a…...

纯css实现多行文本右下角最后一行展示全部按钮

未展开全部&#xff1a; 展开全部&#xff1a; 综上演示按钮始终保持在最下方 css代码如下&#xff1a; <div class"info-content"><div class"info-text" :class"!showAll ? mle-hidden : "><span class"show-all"…...

WPF篇(17)-ListBox列表控件+ListView数据列表控件

ListBox列表控件 ListBox是一个列表控件&#xff0c;用于显示条目类的数据&#xff0c;默认每行只能显示一个内容项&#xff0c;当然&#xff0c;我们可以通过修改它的数据模板&#xff0c;来自定义每一行&#xff08;元素&#xff09;的数据外观&#xff0c;达到显示更多数据…...

HAProxy 全解析:驾驭网络负载均衡与高可用的强大引擎

一、什么是HAproxy HAProxy是一个免费、开源的高性能TCP/HTTP负载均衡器和代理服务器软件&#xff0c;主要用于实现以下功能 一、负载均衡 多种负载均衡算法支持&#xff1a; 轮询&#xff08;Round Robin&#xff09;&#xff1a;它依次将请求均匀分配到后端的各个服务器。例…...

陶瓷材质的防静电架空地板越来越受欢迎的原因

目前市面上的陶瓷防静电架空地板主要分为两种&#xff1a;钢基和硫酸钙基。前者是以全钢冲孔裸板作为板基&#xff0c;经粘接、固定整型和灌浆的方式加工而成&#xff0c;后者是以复合硫酸钙板为基材&#xff0c;表面粘接防静电陶瓷砖&#xff0c;四周导电PVC边条封边。近年来陶…...

Mariadb数据库本机无密码登录的问题解决

Mariadb数据库本机无密码登录的问题解决 安装了mariadb后&#xff0c;发现Mariadb本机无密码才能登录 百度了很多文章&#xff0c;发现很多人是因为root的plugin设置的值不正确导致的&#xff0c;unix_socket可以不需要密码&#xff0c;mysql_native_password 是正常的。 解…...

校园外卖平台小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;商家管理&#xff0c;菜品信息管理&#xff0c;菜品分类管理&#xff0c;购买菜品管理&#xff0c;订单信息管理&#xff0c;系统管理 微信端账号功能包括&#xff1a;系统首页&a…...

Python3 第八十一课 -- urllib

目录 一. 前言 二. urllib.request 三. urllib.error 四. urllib.parse 五. urllib.robotparser 一. 前言 Python urllib 库用于操作网页 URL&#xff0c;并对网页的内容进行抓取处理。 本文主要介绍 Python3 的 urllib。 urllib 包 包含以下几个模块&#xff1a; url…...

Vue 3+Vite+Eectron从入门到实战系列之(五)一后台管理登录页

前面已经讲了不少基础知识&#xff0c;这篇开始&#xff0c;我们进行实操&#xff0c;做个后台管理系统&#xff0c;打包成多端的,可安装的桌面app!!其中&#xff0c;登录&#xff0c;退出的提示信息用系统的提示&#xff0c;不使用elemengplus的弹窗提示&#xff01;&#xff…...

Docker 网络代理配置及防火墙设置指南

Docker 网络代理配置及防火墙设置指南 背景 在某些环境中&#xff0c;服务器无法直接访问外网&#xff0c;需要通过网络代理进行连接。虽然我们通常会在 /etc/environment 或 /etc/profile 等系统配置文件中直接配置代理&#xff0c;但 Docker 命令无法使用这些配置。例如&am…...

基于PostGIS(Postgres)+Node.js实现的xyz瓦片地图服务器

背景介绍 前两天研究GeoServer发布存储在PostGIS中栅格数据&#xff0c;最终目的是想在PostGIS中存储金字塔瓦片&#xff0c;用GeoServer发布&#xff0c;但是最后经过研究不改GeoServer源码的情况下&#xff0c;好像只支持将大图tif存在PostGIS数据库中进行发布&#xff0c;金…...

浙大数据结构慕课课后题(06-图3 六度空间)

题目要求&#xff1a; 输入格式: 输入第1行给出两个正整数&#xff0c;分别表示社交网络图的结点数N&#xff08;1<N≤103&#xff0c;表示人数&#xff09;、边数M&#xff08;≤33N&#xff0c;表示社交关系数&#xff09;。随后的M行对应M条边&#xff0c;每行给出一对正…...

Windows File Recovery卡在99%怎么解决?实用指南!

为什么会出现“Windows File Recovery卡在99%”的问题&#xff1f; Windows File Recovery&#xff08;Windows文件恢复&#xff09;是微软设计的命令行应用程序。它可以帮助用户从健康/损坏/格式化的存储设备中恢复已删除/丢失的文件。 通过输入相关命令&#xff0c;设置源/…...

数据结构之数组

写在前面 看下数组。 1&#xff1a;巴拉巴拉 数组是一种线性数据结构&#xff0c;使用连续的内存空间来存储数据&#xff0c;存储的数据要求有相同的数据类型&#xff0c;并且每个元素占用的内存空间相同。获取元素速度非常快&#xff0c;为O(1)常量时间复杂度&#xff0c;所…...

springboot集成sensitive-word实现敏感词过滤

文章目录 敏感词过滤方案一&#xff1a;正则表达式方案二&#xff1a;基于DFA算法的敏感词过滤工具框架-sensitive-wordspringboot集成sensitive-word步骤一&#xff1a;引入pom步骤二&#xff1a;自定义配置步骤三&#xff1a;自定义敏感词白名单步骤四&#xff1a;核心方法测…...

ARM A64指令集架构解析与优化实践

1. A64指令集架构概述A64指令集作为ARMv8-A架构的64位执行状态核心&#xff0c;采用固定32位长度编码设计&#xff0c;这种设计在指令获取和流水线处理上具有显著优势。与传统的变长指令集相比&#xff0c;固定长度编码使得指令预取和译码阶段更加高效&#xff0c;尤其适合现代…...

微型环境传感器技术:PM2.5与VOC检测的突破与应用

1. 个人空气质量监测的技术革命在深圳的一个典型工作日早晨&#xff0c;张工程师像往常一样准备出门上班。他习惯性地查看手机上的空气质量指数&#xff0c;发现室外PM2.5数值高达85μg/m&#xff08;超过WHO安全标准3倍以上&#xff09;。犹豫片刻后&#xff0c;他戴上了N95口…...

深入解析Arm架构TLB维护机制与A64指令集

1. TLB维护机制基础解析在处理器架构中&#xff0c;TLB&#xff08;Translation Lookaside Buffer&#xff09;是内存管理单元&#xff08;MMU&#xff09;的核心组件&#xff0c;负责缓存虚拟地址到物理地址的转换结果。当CPU需要访问内存时&#xff0c;首先会查询TLB获取地址…...

终极指南:NoSQL数据库大全awesome-bigdata - 文档型数据库实战入门 [特殊字符]

终极指南&#xff1a;NoSQL数据库大全awesome-bigdata - 文档型数据库实战入门 &#x1f680; 【免费下载链接】awesome-bigdata A curated list of awesome big data frameworks, ressources and other awesomeness. 项目地址: https://gitcode.com/gh_mirrors/aw/awesome-b…...

Redis++完全指南:C++开发者的终极Redis客户端解决方案

Redis完全指南&#xff1a;C开发者的终极Redis客户端解决方案 【免费下载链接】redis-plus-plus Redis client written in C 项目地址: https://gitcode.com/gh_mirrors/re/redis-plus-plus Redis是一款专为C开发者打造的高性能Redis客户端&#xff0c;它提供了简洁易用…...

商业航天崛起:从SpaceX看工程创新与政策博弈的融合

1. 商业航天崛起的时代背景与技术逻辑2012年5月&#xff0c;当SpaceX的“龙”飞船与国际空间站成功对接时&#xff0c;我正和几位航天领域的同行在会议室里盯着直播画面。那一刻的安静与随后爆发的掌声&#xff0c;不仅仅是为一次技术成功&#xff0c;更是为一个新时代的开启感…...

AI小白必看:打好基础再冲大模型,收藏这份学习路线图!

本文针对想学习AI的学生&#xff0c;强调掌握基础的重要性&#xff0c;避免直接进入大模型学习。文章提出应先理解AI的核心是让机器从数据中学习规律&#xff0c;并掌握数学、编程和数据思维能力。建议从数据处理开始&#xff0c;熟悉Python及常用库&#xff0c;逐步学习机器学…...

经验小波变换(EWT):从理论基石到信号分解实战

1. 经验小波变换&#xff08;EWT&#xff09;的前世今生 我第一次接触EWT是在处理一段轴承振动信号时。当时用传统EMD方法分解出的IMF分量里&#xff0c;高频噪声和故障特征频率完全混在一起&#xff0c;就像把咖啡和牛奶搅成了拿铁——虽然都是白色液体&#xff0c;但根本分不…...

BIOS里找不到SSD硬盘?Win10启动失败?可能是ESP引导分区‘隐身’了,手把手教你用PE盘和DiskGenius把它找回来

BIOS里找不到SSD硬盘&#xff1f;Win10启动失败&#xff1f;可能是ESP引导分区‘隐身’了 最近遇到一个奇怪的故障&#xff1a;明明SSD硬盘在PE系统里能正常识别&#xff0c;但BIOS启动项里却死活找不到它。系统反复提示"reboot and select proper boot device"&…...

Linux调试利器:用addr2line精准定位程序崩溃现场

1. 当程序崩溃时&#xff0c;我们该如何快速定位问题&#xff1f; 作为一名长期奋战在Linux开发一线的程序员&#xff0c;我最头疼的就是遇到程序突然崩溃的情况。那种看着终端输出"Segmentation fault (core dumped)"却无从下手的无力感&#xff0c;相信很多开发者都…...