详解 Redis 队列 实现
Redis 是一个高性能的键值存储系统,它的多种数据结构使其能够以不同方式实现队列,包括普通队列、延时队列和异步队列的介绍和示例。
介绍
Redis 的 List 数据结构可以用来实现普通的队列。
生产者使用 LPUSH 或 RPUSH 命令将消息添加到列表的头部或尾部,消费者使用 LPOP 或 RPOP 命令从列表的头部或尾部取出消息。
这种方式简单轻量,但缺少一些高级特性,如消息重试、持久化等 。
延时队列
延时队列可以通过 Redis 的 Sorted Set 数据结构来实现。
消息的到期时间作为分数(score),消息内容作为成员(member)。使用 ZADD 命令添加消息,并通过 ZRANGEBYSCORE 命令获取到期的消息进行消费。
这种方式可以保证消息的有序性,并且处理效率非常高 。
异步队列
Redis 的发布/订阅(pub/sub)模式可以实现异步队列。
生产者使用 PUBLISH 命令发送消息到一个频道,消费者使用 SUBSCRIBE 命令订阅频道来接收消息。
这种方式可以支持消息的广播,但消息无法持久化,且可能会出现消息丢失的情况。
Redis Stream 的使用
Redis 5.0 版本引入了 Stream 数据结构,它是一个持久化的、支持消费者组的消息队列。
使用 XADD 命令添加消息,XREAD 或 XREADGROUP 命令读取消息,并通过 XACK 命令确认消息已被处理。
这种方式支持消息的持久化、消费者组的概念以及消息确认机制 。
示例
RedisTemplate来实现队列操作
在Spring Boot中配置和使用RedisTemplate来实现队列操作主要涉及以下几个步骤:
添加依赖:
确保你的pom.xml文件中已经添加了Spring Boot的Redis依赖。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
配置Redis:
在application.properties或application.yml中配置Redis服务器的连接信息。
application.yml
spring:redis:host: localhostport: 6379
配置RedisTemplate:
创建一个配置类,配置RedisTemplate的序列化器等。
@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 使用StringRedisSerializer序列化keytemplate.setKeySerializer(new StringRedisSerializer());// 使用Jackson2JsonRedisSerializer序列化valuetemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));// 设置hash的key和value序列化方式template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));// 启用事务template.setEnableTransactionSupport(true);return template;}
}
使用RedisTemplate进行队列操作:
注入RedisTemplate并使用它进行队列操作。
@Service
public class RedisQueueService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;public void enqueue(String queueName, Object value) {redisTemplate.opsForList().rightPush(queueName, value);}public Object dequeue(String queueName) {return redisTemplate.opsForList().leftPop(queueName);}// 其他队列操作...
}
这是一个基本的配置和使用RedisTemplate实现队列操作的流程。
根据你的具体需求,可能还需要配置连接池、密码认证、集群支持等高级特性。此外,对于延时队列和异步队列,可能需要使用Redisson或其他高级特性和库。
实现异步队列
在Spring Boot中,使用Redis实现异步队列通常可以通过发布/订阅模式或列表(List)数据结构来完成。以下是两种实现方式的示例:
使用发布/订阅模式实现异步队列
发布/订阅模式是一种消息通信模式,其中消息生产者(发布者)不会将消息直接发送到特定的接收者(订阅者),而是将消息发布到一个主题。
对这些消息感兴趣的接收者可以订阅这个主题,从而异步接收消息。
配置RedisTemplate:
首先,在Spring Boot应用中配置RedisTemplate,以便于操作Redis。
创建消息生产者:
使用RedisTemplate的convertAndSend方法发布消息到指定的频道。
@Service
public class MessageProducer {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void sendMessage(String channel, String message) {redisTemplate.convertAndSend(channel, message);}
}
创建消息消费者:
创建一个消息监听器,订阅频道并接收消息。
@Service
public class MessageListener {@RabbitListener(queues = "queue.name")public void receiveMessage(String message) {// 处理接收到的消息System.out.println("Received: " + message);}
}
使用列表(List)数据结构实现异步队列
列表是一种双向链表结构,可以作为队列使用。
生产者可以使用lpush或rpush将消息添加到列表的头部或尾部,消费者可以使用lpop或rpop从列表的头部或尾部取出消息。
配置RedisTemplate:
同样,首先需要配置RedisTemplate。
生产者添加消息到列表:
使用RedisTemplate的opsForList操作列表,将消息压入列表。
@Service
public class AsyncQueueProducer {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void pushMessage(String listKey, String message) {redisTemplate.opsForList().rightPush(listKey, message);}
}
消费者从列表中取出消息:
消费者可以从列表中弹出消息并处理。
@Service
public class AsyncQueueConsumer {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void consumeMessage(String listKey) {List<String> messages = redisTemplate.opsForList().range(listKey, 0, -1);for (String message : messages) {// 处理消息System.out.println("Processing: " + message);}}
}
在实际应用中,可以根据业务需求选择使用发布/订阅模式或列表数据结构来实现异步队列。
发布/订阅模式适用于消息广播的场景,而列表数据结构更适用于实现一个简单的任务队列
实现延时队列
在Spring Boot中使用Redis实现延时队列,可以通过Sorted Set数据结构来实现。以下是具体的实现步骤和示例代码:
步骤 1: 添加Redis依赖
首先,确保你的pom.xml文件中已经添加了Spring Boot的Redis依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
步骤 2: 配置Redis
在application.properties或application.yml中配置Redis服务器的连接信息:
application.properties
spring.redis.host=localhost
spring.redis.port=6379
步骤 3: 创建延时队列服务
创建一个服务类来封装延时队列的逻辑:
@Service
public class DelayedQueueService {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void enqueue(String queueName, String message, long delaySeconds) {double score = System.currentTimeMillis() + delaySeconds * 1000;redisTemplate.opsForZSet().add(queueName, message, score);}public String dequeue(String queueName) {Set<String> keys = redisTemplate.opsForZSet().rangeByScore(queueName, 0, System.currentTimeMillis());if (!keys.isEmpty()) {String message = keys.iterator().next();redisTemplate.opsForZSet().remove(queueName, message);return message;}return null;}
}
步骤 4: 使用延时队列
在业务逻辑中使用DelayedQueueService来添加和消费延时消息:
@RestController
public class DelayedQueueController {@Autowiredprivate DelayedQueueService delayedQueueService;@PostMapping("/delayed-enqueue")public ResponseEntity<?> enqueueDelayedMessage(@RequestParam String message, @RequestParam long delaySeconds) {String queueName = "delayedQueue";delayedQueueService.enqueue(queueName, message, delaySeconds);return ResponseEntity.ok("Message enqueued with delay " + delaySeconds + " seconds");}@GetMapping("/delayed-dequeue")public ResponseEntity<?> dequeueDelayedMessage() {String queueName = "delayedQueue";String message = delayedQueueService.dequeue(queueName);if (message != null) {return ResponseEntity.ok(message);} else {return ResponseEntity.noContent().build();}}
}
说明
enqueue方法将消息和其预定的延迟时间戳(当前时间 + 延迟时间)作为分数(score)添加到Sorted Set中。
dequeue方法检索分数小于或等于当前时间的所有消息,从Sorted Set中移除并返回第一个消息。
定时任务(如果有)可以用于定期处理到期的消息。
这个简单的实现提供了一个基础的延时队列功能,适用于需要异步处理但具有特定延迟时间的任务。对于更高级的消息队列需求,可能需要考虑使用专业的MQ系统。
Redis Stream实现持久化队列
添加依赖:
确保pom.xml文件中已添加Spring Boot的Redis依赖。
配置Redis:
在application.properties或application.yml中配置Redis服务器的连接信息。
创建消息生产者:使用RedisTemplate的opsForStream()方法添加消息到Stream中。
创建消费者组:
使用XGROUP CREATE命令为Stream创建消费者组,可以指定从哪个消息ID开始消费。
配置消费者:
实现StreamListener接口,编写接收消息的逻辑。
配置Stream监听器容器:
使用StreamMessageListenerContainer来管理消费者,监听特定Stream的消息,并分配给消费者处理。
示例代码如下:
@Service
public class MessageProducer {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void sendMessage(String streamKey, Map<String, String> message) {redisTemplate.opsForStream().add(streamKey, message);}
}
@Service
public class MessageConsumer implements StreamListener<String, ObjectRecord<String, String>> {@Overridepublic void onMessage(ObjectRecord<String, String> message) {// 处理接收到的消息}
}
@Configuration
public class RedisStreamConfig {@Beanpublic StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) {StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =new StreamMessageListenerContainer<>(connectionFactory, messageConsumer);container.start();return container;}
}
在上述代码中,MessageProducer用于发送消息到Redis
Stream,MessageConsumer实现了StreamListener接口来接收消息。
RedisStreamConfig配置了StreamMessageListenerContainer,它会启动并监听消息,将收到的消息分发给消费者处理。
注意,为了确保消息的可靠性,可以实现消息确认机制,在消息被成功处理后,通过XACK命令向Redis确认消息已被消费。同时,如果需要处理消息的持久化和回溯,Redis Stream提供了相应的命令来查询历史消息或未确认的消息 。
通过这种方式,Spring Boot应用可以利用Redis Stream构建一个高性能、持久化且支持消费者组的队列系统,适用于多种消息队列场景 。
相关文章:
详解 Redis 队列 实现
Redis 是一个高性能的键值存储系统,它的多种数据结构使其能够以不同方式实现队列,包括普通队列、延时队列和异步队列的介绍和示例。 介绍 Redis 的 List 数据结构可以用来实现普通的队列。 生产者使用 LPUSH 或 RPUSH 命令将消息添加到列表的头部或尾部…...
分析SQL的count(*)并优化
最近优化过几个慢查询接口的性能,总结了一些心得体会拿出来跟大家一起分享一下,希望对你会有所帮助。 我们使用的数据库是Mysql8,使用的存储引擎是Innodb。这次优化除了优化索引之外,更多的是在优化count(*)。 通常情况下&#…...
Java学习日记(day18)
一、软件的结构 C/S (Client - Server 客户端-服务器端) 典型应用:QQ软件 ,飞秋,印象笔记。 特点: 必须下载特定的客户端程序。服务器端升级,客户端升级。 B/S (Broswer -Server 浏览器端- 服务器端&a…...
Oracle(61)什么是外部表(External Table)?
外部表(External Table)是Oracle数据库中的一种特殊表类型,用于访问存储在外部文件系统中的数据,而不需要将数据实际加载到数据库内部。外部表的主要优势在于允许数据库用户在不移动或复制数据的情况下,直接查询和处理…...
物联网HMI/网关搭载ARM+CODESYS实现软PLC+HMI一体化
物联网HMI/网关搭载CODESYS实现软PLCHMI一体化 硬件:ARM平台,支持STM32/全志T3/RK3568/树莓派等平台 软件:CODESYS V3.5、JMobile Studio CODESYS是一款功能强大的PLC软件编程工具,它支持IEC61131-3标准IL、ST、FBD、LD、CFC、…...
Java中Stream流
Java中Stream流 Stream 使用flatMap处理嵌套集合: 有一个对象列表,每个对象又包含一个列表,可以使用flatMap来“展平”这个结构。 List<List<String>> listOfLists Arrays.asList(Arrays.asList("a", "b"),Arrays.a…...
纯css实现多行文本右下角最后一行展示全部按钮
未展开全部: 展开全部: 综上演示按钮始终保持在最下方 css代码如下: <div class"info-content"><div class"info-text" :class"!showAll ? mle-hidden : "><span class"show-all"…...
WPF篇(17)-ListBox列表控件+ListView数据列表控件
ListBox列表控件 ListBox是一个列表控件,用于显示条目类的数据,默认每行只能显示一个内容项,当然,我们可以通过修改它的数据模板,来自定义每一行(元素)的数据外观,达到显示更多数据…...
HAProxy 全解析:驾驭网络负载均衡与高可用的强大引擎
一、什么是HAproxy HAProxy是一个免费、开源的高性能TCP/HTTP负载均衡器和代理服务器软件,主要用于实现以下功能 一、负载均衡 多种负载均衡算法支持: 轮询(Round Robin):它依次将请求均匀分配到后端的各个服务器。例…...
陶瓷材质的防静电架空地板越来越受欢迎的原因
目前市面上的陶瓷防静电架空地板主要分为两种:钢基和硫酸钙基。前者是以全钢冲孔裸板作为板基,经粘接、固定整型和灌浆的方式加工而成,后者是以复合硫酸钙板为基材,表面粘接防静电陶瓷砖,四周导电PVC边条封边。近年来陶…...
Mariadb数据库本机无密码登录的问题解决
Mariadb数据库本机无密码登录的问题解决 安装了mariadb后,发现Mariadb本机无密码才能登录 百度了很多文章,发现很多人是因为root的plugin设置的值不正确导致的,unix_socket可以不需要密码,mysql_native_password 是正常的。 解…...
校园外卖平台小程序的设计
管理员账户功能包括:系统首页,个人中心,用户管理,商家管理,菜品信息管理,菜品分类管理,购买菜品管理,订单信息管理,系统管理 微信端账号功能包括:系统首页&a…...
Python3 第八十一课 -- urllib
目录 一. 前言 二. urllib.request 三. urllib.error 四. urllib.parse 五. urllib.robotparser 一. 前言 Python urllib 库用于操作网页 URL,并对网页的内容进行抓取处理。 本文主要介绍 Python3 的 urllib。 urllib 包 包含以下几个模块: url…...
Vue 3+Vite+Eectron从入门到实战系列之(五)一后台管理登录页
前面已经讲了不少基础知识,这篇开始,我们进行实操,做个后台管理系统,打包成多端的,可安装的桌面app!!其中,登录,退出的提示信息用系统的提示,不使用elemengplus的弹窗提示!ÿ…...
Docker 网络代理配置及防火墙设置指南
Docker 网络代理配置及防火墙设置指南 背景 在某些环境中,服务器无法直接访问外网,需要通过网络代理进行连接。虽然我们通常会在 /etc/environment 或 /etc/profile 等系统配置文件中直接配置代理,但 Docker 命令无法使用这些配置。例如&am…...
基于PostGIS(Postgres)+Node.js实现的xyz瓦片地图服务器
背景介绍 前两天研究GeoServer发布存储在PostGIS中栅格数据,最终目的是想在PostGIS中存储金字塔瓦片,用GeoServer发布,但是最后经过研究不改GeoServer源码的情况下,好像只支持将大图tif存在PostGIS数据库中进行发布,金…...
浙大数据结构慕课课后题(06-图3 六度空间)
题目要求: 输入格式: 输入第1行给出两个正整数,分别表示社交网络图的结点数N(1<N≤103,表示人数)、边数M(≤33N,表示社交关系数)。随后的M行对应M条边,每行给出一对正…...
Windows File Recovery卡在99%怎么解决?实用指南!
为什么会出现“Windows File Recovery卡在99%”的问题? Windows File Recovery(Windows文件恢复)是微软设计的命令行应用程序。它可以帮助用户从健康/损坏/格式化的存储设备中恢复已删除/丢失的文件。 通过输入相关命令,设置源/…...
数据结构之数组
写在前面 看下数组。 1:巴拉巴拉 数组是一种线性数据结构,使用连续的内存空间来存储数据,存储的数据要求有相同的数据类型,并且每个元素占用的内存空间相同。获取元素速度非常快,为O(1)常量时间复杂度,所…...
springboot集成sensitive-word实现敏感词过滤
文章目录 敏感词过滤方案一:正则表达式方案二:基于DFA算法的敏感词过滤工具框架-sensitive-wordspringboot集成sensitive-word步骤一:引入pom步骤二:自定义配置步骤三:自定义敏感词白名单步骤四:核心方法测…...
网络编程(Modbus进阶)
思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...
Chapter03-Authentication vulnerabilities
文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...
基于大模型的 UI 自动化系统
基于大模型的 UI 自动化系统 下面是一个完整的 Python 系统,利用大模型实现智能 UI 自动化,结合计算机视觉和自然语言处理技术,实现"看屏操作"的能力。 系统架构设计 #mermaid-svg-2gn2GRvh5WCP2ktF {font-family:"trebuchet ms",verdana,arial,sans-…...
ES6从入门到精通:前言
ES6简介 ES6(ECMAScript 2015)是JavaScript语言的重大更新,引入了许多新特性,包括语法糖、新数据类型、模块化支持等,显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var…...
【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...
Debian系统简介
目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版ÿ…...
【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器
——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的一体化测试平台,覆盖应用全生命周期测试需求,主要提供五大核心能力: 测试类型检测目标关键指标功能体验基…...
iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版分享
平时用 iPhone 的时候,难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵,或者买了二手 iPhone 却被原来的 iCloud 账号锁住,这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...
【配置 YOLOX 用于按目录分类的图片数据集】
现在的图标点选越来越多,如何一步解决,采用 YOLOX 目标检测模式则可以轻松解决 要在 YOLOX 中使用按目录分类的图片数据集(每个目录代表一个类别,目录下是该类别的所有图片),你需要进行以下配置步骤&#x…...
学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”
2025年#高考 将在近日拉开帷幕,#AI 监考一度冲上热搜。当AI深度融入高考,#时间同步 不再是辅助功能,而是决定AI监考系统成败的“生命线”。 AI亮相2025高考,40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕,江西、…...
