详解 Redis 队列 实现
Redis 是一个高性能的键值存储系统,它的多种数据结构使其能够以不同方式实现队列,包括普通队列、延时队列和异步队列的介绍和示例。
介绍
Redis 的 List 数据结构可以用来实现普通的队列。
生产者使用 LPUSH 或 RPUSH 命令将消息添加到列表的头部或尾部,消费者使用 LPOP 或 RPOP 命令从列表的头部或尾部取出消息。
这种方式简单轻量,但缺少一些高级特性,如消息重试、持久化等 。
延时队列
延时队列可以通过 Redis 的 Sorted Set 数据结构来实现。
消息的到期时间作为分数(score),消息内容作为成员(member)。使用 ZADD 命令添加消息,并通过 ZRANGEBYSCORE 命令获取到期的消息进行消费。
这种方式可以保证消息的有序性,并且处理效率非常高 。
异步队列
Redis 的发布/订阅(pub/sub)模式可以实现异步队列。
生产者使用 PUBLISH 命令发送消息到一个频道,消费者使用 SUBSCRIBE 命令订阅频道来接收消息。
这种方式可以支持消息的广播,但消息无法持久化,且可能会出现消息丢失的情况。
Redis Stream 的使用
Redis 5.0 版本引入了 Stream 数据结构,它是一个持久化的、支持消费者组的消息队列。
使用 XADD 命令添加消息,XREAD 或 XREADGROUP 命令读取消息,并通过 XACK 命令确认消息已被处理。
这种方式支持消息的持久化、消费者组的概念以及消息确认机制 。
示例
RedisTemplate来实现队列操作
在Spring Boot中配置和使用RedisTemplate来实现队列操作主要涉及以下几个步骤:
添加依赖:
确保你的pom.xml文件中已经添加了Spring Boot的Redis依赖。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
配置Redis:
在application.properties或application.yml中配置Redis服务器的连接信息。
application.yml
spring:redis:host: localhostport: 6379
配置RedisTemplate:
创建一个配置类,配置RedisTemplate的序列化器等。
@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 使用StringRedisSerializer序列化keytemplate.setKeySerializer(new StringRedisSerializer());// 使用Jackson2JsonRedisSerializer序列化valuetemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));// 设置hash的key和value序列化方式template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));// 启用事务template.setEnableTransactionSupport(true);return template;}
}
使用RedisTemplate进行队列操作:
注入RedisTemplate并使用它进行队列操作。
@Service
public class RedisQueueService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;public void enqueue(String queueName, Object value) {redisTemplate.opsForList().rightPush(queueName, value);}public Object dequeue(String queueName) {return redisTemplate.opsForList().leftPop(queueName);}// 其他队列操作...
}
这是一个基本的配置和使用RedisTemplate实现队列操作的流程。
根据你的具体需求,可能还需要配置连接池、密码认证、集群支持等高级特性。此外,对于延时队列和异步队列,可能需要使用Redisson或其他高级特性和库。
实现异步队列
在Spring Boot中,使用Redis实现异步队列通常可以通过发布/订阅模式或列表(List)数据结构来完成。以下是两种实现方式的示例:
使用发布/订阅模式实现异步队列
发布/订阅模式是一种消息通信模式,其中消息生产者(发布者)不会将消息直接发送到特定的接收者(订阅者),而是将消息发布到一个主题。
对这些消息感兴趣的接收者可以订阅这个主题,从而异步接收消息。
配置RedisTemplate:
首先,在Spring Boot应用中配置RedisTemplate,以便于操作Redis。
创建消息生产者:
使用RedisTemplate的convertAndSend方法发布消息到指定的频道。
@Service
public class MessageProducer {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void sendMessage(String channel, String message) {redisTemplate.convertAndSend(channel, message);}
}
创建消息消费者:
创建一个消息监听器,订阅频道并接收消息。
@Service
public class MessageListener {@RabbitListener(queues = "queue.name")public void receiveMessage(String message) {// 处理接收到的消息System.out.println("Received: " + message);}
}
使用列表(List)数据结构实现异步队列
列表是一种双向链表结构,可以作为队列使用。
生产者可以使用lpush或rpush将消息添加到列表的头部或尾部,消费者可以使用lpop或rpop从列表的头部或尾部取出消息。
配置RedisTemplate:
同样,首先需要配置RedisTemplate。
生产者添加消息到列表:
使用RedisTemplate的opsForList操作列表,将消息压入列表。
@Service
public class AsyncQueueProducer {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void pushMessage(String listKey, String message) {redisTemplate.opsForList().rightPush(listKey, message);}
}
消费者从列表中取出消息:
消费者可以从列表中弹出消息并处理。
@Service
public class AsyncQueueConsumer {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void consumeMessage(String listKey) {List<String> messages = redisTemplate.opsForList().range(listKey, 0, -1);for (String message : messages) {// 处理消息System.out.println("Processing: " + message);}}
}
在实际应用中,可以根据业务需求选择使用发布/订阅模式或列表数据结构来实现异步队列。
发布/订阅模式适用于消息广播的场景,而列表数据结构更适用于实现一个简单的任务队列
实现延时队列
在Spring Boot中使用Redis实现延时队列,可以通过Sorted Set数据结构来实现。以下是具体的实现步骤和示例代码:
步骤 1: 添加Redis依赖
首先,确保你的pom.xml文件中已经添加了Spring Boot的Redis依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
步骤 2: 配置Redis
在application.properties或application.yml中配置Redis服务器的连接信息:
application.properties
spring.redis.host=localhost
spring.redis.port=6379
步骤 3: 创建延时队列服务
创建一个服务类来封装延时队列的逻辑:
@Service
public class DelayedQueueService {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void enqueue(String queueName, String message, long delaySeconds) {double score = System.currentTimeMillis() + delaySeconds * 1000;redisTemplate.opsForZSet().add(queueName, message, score);}public String dequeue(String queueName) {Set<String> keys = redisTemplate.opsForZSet().rangeByScore(queueName, 0, System.currentTimeMillis());if (!keys.isEmpty()) {String message = keys.iterator().next();redisTemplate.opsForZSet().remove(queueName, message);return message;}return null;}
}
步骤 4: 使用延时队列
在业务逻辑中使用DelayedQueueService来添加和消费延时消息:
@RestController
public class DelayedQueueController {@Autowiredprivate DelayedQueueService delayedQueueService;@PostMapping("/delayed-enqueue")public ResponseEntity<?> enqueueDelayedMessage(@RequestParam String message, @RequestParam long delaySeconds) {String queueName = "delayedQueue";delayedQueueService.enqueue(queueName, message, delaySeconds);return ResponseEntity.ok("Message enqueued with delay " + delaySeconds + " seconds");}@GetMapping("/delayed-dequeue")public ResponseEntity<?> dequeueDelayedMessage() {String queueName = "delayedQueue";String message = delayedQueueService.dequeue(queueName);if (message != null) {return ResponseEntity.ok(message);} else {return ResponseEntity.noContent().build();}}
}
说明
enqueue方法将消息和其预定的延迟时间戳(当前时间 + 延迟时间)作为分数(score)添加到Sorted Set中。
dequeue方法检索分数小于或等于当前时间的所有消息,从Sorted Set中移除并返回第一个消息。
定时任务(如果有)可以用于定期处理到期的消息。
这个简单的实现提供了一个基础的延时队列功能,适用于需要异步处理但具有特定延迟时间的任务。对于更高级的消息队列需求,可能需要考虑使用专业的MQ系统。
Redis Stream实现持久化队列
添加依赖:
确保pom.xml文件中已添加Spring Boot的Redis依赖。
配置Redis:
在application.properties或application.yml中配置Redis服务器的连接信息。
创建消息生产者:使用RedisTemplate的opsForStream()方法添加消息到Stream中。
创建消费者组:
使用XGROUP CREATE命令为Stream创建消费者组,可以指定从哪个消息ID开始消费。
配置消费者:
实现StreamListener接口,编写接收消息的逻辑。
配置Stream监听器容器:
使用StreamMessageListenerContainer来管理消费者,监听特定Stream的消息,并分配给消费者处理。
示例代码如下:
@Service
public class MessageProducer {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void sendMessage(String streamKey, Map<String, String> message) {redisTemplate.opsForStream().add(streamKey, message);}
}
@Service
public class MessageConsumer implements StreamListener<String, ObjectRecord<String, String>> {@Overridepublic void onMessage(ObjectRecord<String, String> message) {// 处理接收到的消息}
}
@Configuration
public class RedisStreamConfig {@Beanpublic StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) {StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =new StreamMessageListenerContainer<>(connectionFactory, messageConsumer);container.start();return container;}
}
在上述代码中,MessageProducer用于发送消息到Redis
Stream,MessageConsumer实现了StreamListener接口来接收消息。
RedisStreamConfig配置了StreamMessageListenerContainer,它会启动并监听消息,将收到的消息分发给消费者处理。
注意,为了确保消息的可靠性,可以实现消息确认机制,在消息被成功处理后,通过XACK命令向Redis确认消息已被消费。同时,如果需要处理消息的持久化和回溯,Redis Stream提供了相应的命令来查询历史消息或未确认的消息 。
通过这种方式,Spring Boot应用可以利用Redis Stream构建一个高性能、持久化且支持消费者组的队列系统,适用于多种消息队列场景 。
相关文章:
详解 Redis 队列 实现
Redis 是一个高性能的键值存储系统,它的多种数据结构使其能够以不同方式实现队列,包括普通队列、延时队列和异步队列的介绍和示例。 介绍 Redis 的 List 数据结构可以用来实现普通的队列。 生产者使用 LPUSH 或 RPUSH 命令将消息添加到列表的头部或尾部…...
分析SQL的count(*)并优化
最近优化过几个慢查询接口的性能,总结了一些心得体会拿出来跟大家一起分享一下,希望对你会有所帮助。 我们使用的数据库是Mysql8,使用的存储引擎是Innodb。这次优化除了优化索引之外,更多的是在优化count(*)。 通常情况下&#…...
Java学习日记(day18)
一、软件的结构 C/S (Client - Server 客户端-服务器端) 典型应用:QQ软件 ,飞秋,印象笔记。 特点: 必须下载特定的客户端程序。服务器端升级,客户端升级。 B/S (Broswer -Server 浏览器端- 服务器端&a…...
Oracle(61)什么是外部表(External Table)?
外部表(External Table)是Oracle数据库中的一种特殊表类型,用于访问存储在外部文件系统中的数据,而不需要将数据实际加载到数据库内部。外部表的主要优势在于允许数据库用户在不移动或复制数据的情况下,直接查询和处理…...
物联网HMI/网关搭载ARM+CODESYS实现软PLC+HMI一体化
物联网HMI/网关搭载CODESYS实现软PLCHMI一体化 硬件:ARM平台,支持STM32/全志T3/RK3568/树莓派等平台 软件:CODESYS V3.5、JMobile Studio CODESYS是一款功能强大的PLC软件编程工具,它支持IEC61131-3标准IL、ST、FBD、LD、CFC、…...
Java中Stream流
Java中Stream流 Stream 使用flatMap处理嵌套集合: 有一个对象列表,每个对象又包含一个列表,可以使用flatMap来“展平”这个结构。 List<List<String>> listOfLists Arrays.asList(Arrays.asList("a", "b"),Arrays.a…...
纯css实现多行文本右下角最后一行展示全部按钮
未展开全部: 展开全部: 综上演示按钮始终保持在最下方 css代码如下: <div class"info-content"><div class"info-text" :class"!showAll ? mle-hidden : "><span class"show-all"…...
WPF篇(17)-ListBox列表控件+ListView数据列表控件
ListBox列表控件 ListBox是一个列表控件,用于显示条目类的数据,默认每行只能显示一个内容项,当然,我们可以通过修改它的数据模板,来自定义每一行(元素)的数据外观,达到显示更多数据…...
HAProxy 全解析:驾驭网络负载均衡与高可用的强大引擎
一、什么是HAproxy HAProxy是一个免费、开源的高性能TCP/HTTP负载均衡器和代理服务器软件,主要用于实现以下功能 一、负载均衡 多种负载均衡算法支持: 轮询(Round Robin):它依次将请求均匀分配到后端的各个服务器。例…...
陶瓷材质的防静电架空地板越来越受欢迎的原因
目前市面上的陶瓷防静电架空地板主要分为两种:钢基和硫酸钙基。前者是以全钢冲孔裸板作为板基,经粘接、固定整型和灌浆的方式加工而成,后者是以复合硫酸钙板为基材,表面粘接防静电陶瓷砖,四周导电PVC边条封边。近年来陶…...
Mariadb数据库本机无密码登录的问题解决
Mariadb数据库本机无密码登录的问题解决 安装了mariadb后,发现Mariadb本机无密码才能登录 百度了很多文章,发现很多人是因为root的plugin设置的值不正确导致的,unix_socket可以不需要密码,mysql_native_password 是正常的。 解…...
校园外卖平台小程序的设计
管理员账户功能包括:系统首页,个人中心,用户管理,商家管理,菜品信息管理,菜品分类管理,购买菜品管理,订单信息管理,系统管理 微信端账号功能包括:系统首页&a…...
Python3 第八十一课 -- urllib
目录 一. 前言 二. urllib.request 三. urllib.error 四. urllib.parse 五. urllib.robotparser 一. 前言 Python urllib 库用于操作网页 URL,并对网页的内容进行抓取处理。 本文主要介绍 Python3 的 urllib。 urllib 包 包含以下几个模块: url…...
Vue 3+Vite+Eectron从入门到实战系列之(五)一后台管理登录页
前面已经讲了不少基础知识,这篇开始,我们进行实操,做个后台管理系统,打包成多端的,可安装的桌面app!!其中,登录,退出的提示信息用系统的提示,不使用elemengplus的弹窗提示!ÿ…...
Docker 网络代理配置及防火墙设置指南
Docker 网络代理配置及防火墙设置指南 背景 在某些环境中,服务器无法直接访问外网,需要通过网络代理进行连接。虽然我们通常会在 /etc/environment 或 /etc/profile 等系统配置文件中直接配置代理,但 Docker 命令无法使用这些配置。例如&am…...
基于PostGIS(Postgres)+Node.js实现的xyz瓦片地图服务器
背景介绍 前两天研究GeoServer发布存储在PostGIS中栅格数据,最终目的是想在PostGIS中存储金字塔瓦片,用GeoServer发布,但是最后经过研究不改GeoServer源码的情况下,好像只支持将大图tif存在PostGIS数据库中进行发布,金…...
浙大数据结构慕课课后题(06-图3 六度空间)
题目要求: 输入格式: 输入第1行给出两个正整数,分别表示社交网络图的结点数N(1<N≤103,表示人数)、边数M(≤33N,表示社交关系数)。随后的M行对应M条边,每行给出一对正…...
Windows File Recovery卡在99%怎么解决?实用指南!
为什么会出现“Windows File Recovery卡在99%”的问题? Windows File Recovery(Windows文件恢复)是微软设计的命令行应用程序。它可以帮助用户从健康/损坏/格式化的存储设备中恢复已删除/丢失的文件。 通过输入相关命令,设置源/…...
数据结构之数组
写在前面 看下数组。 1:巴拉巴拉 数组是一种线性数据结构,使用连续的内存空间来存储数据,存储的数据要求有相同的数据类型,并且每个元素占用的内存空间相同。获取元素速度非常快,为O(1)常量时间复杂度,所…...
springboot集成sensitive-word实现敏感词过滤
文章目录 敏感词过滤方案一:正则表达式方案二:基于DFA算法的敏感词过滤工具框架-sensitive-wordspringboot集成sensitive-word步骤一:引入pom步骤二:自定义配置步骤三:自定义敏感词白名单步骤四:核心方法测…...
Leetcode 3576. Transform Array to All Equal Elements
Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接:3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到…...
在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能
下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能,包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...
mongodb源码分析session执行handleRequest命令find过程
mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令,把数据流转换成Message,状态转变流程是:State::Created 》 St…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...
AtCoder 第409场初级竞赛 A~E题解
A Conflict 【题目链接】 原题链接:A - Conflict 【考点】 枚举 【题目大意】 找到是否有两人都想要的物品。 【解析】 遍历两端字符串,只有在同时为 o 时输出 Yes 并结束程序,否则输出 No。 【难度】 GESP三级 【代码参考】 #i…...
Python实现prophet 理论及参数优化
文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候,写过一篇简单实现,后期随着对该模型的深入研究,本次记录涉及到prophet 的公式以及参数调优,从公式可以更直观…...
相机从app启动流程
一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...
浅谈不同二分算法的查找情况
二分算法原理比较简单,但是实际的算法模板却有很多,这一切都源于二分查找问题中的复杂情况和二分算法的边界处理,以下是博主对一些二分算法查找的情况分析。 需要说明的是,以下二分算法都是基于有序序列为升序有序的情况…...
pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)
目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关࿰…...
CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝
目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为:一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...
