当前位置: 首页 > news >正文

详解 Redis 队列 实现

Redis 是一个高性能的键值存储系统,它的多种数据结构使其能够以不同方式实现队列,包括普通队列、延时队列和异步队列的介绍和示例。

介绍

Redis 的 List 数据结构可以用来实现普通的队列。
生产者使用 LPUSH 或 RPUSH 命令将消息添加到列表的头部或尾部,消费者使用 LPOP 或 RPOP 命令从列表的头部或尾部取出消息。

这种方式简单轻量,但缺少一些高级特性,如消息重试、持久化等 。

延时队列

延时队列可以通过 Redis 的 Sorted Set 数据结构来实现。
消息的到期时间作为分数(score),消息内容作为成员(member)。使用 ZADD 命令添加消息,并通过 ZRANGEBYSCORE 命令获取到期的消息进行消费。

这种方式可以保证消息的有序性,并且处理效率非常高 。

异步队列

Redis 的发布/订阅(pub/sub)模式可以实现异步队列。
生产者使用 PUBLISH 命令发送消息到一个频道,消费者使用 SUBSCRIBE 命令订阅频道来接收消息。

这种方式可以支持消息的广播,但消息无法持久化,且可能会出现消息丢失的情况。

Redis Stream 的使用

Redis 5.0 版本引入了 Stream 数据结构,它是一个持久化的、支持消费者组的消息队列。
使用 XADD 命令添加消息,XREAD 或 XREADGROUP 命令读取消息,并通过 XACK 命令确认消息已被处理。

这种方式支持消息的持久化、消费者组的概念以及消息确认机制 。

示例

RedisTemplate来实现队列操作

在Spring Boot中配置和使用RedisTemplate来实现队列操作主要涉及以下几个步骤:

添加依赖:

确保你的pom.xml文件中已经添加了Spring Boot的Redis依赖。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

配置Redis:

在application.properties或application.yml中配置Redis服务器的连接信息。
application.yml

spring:redis:host: localhostport: 6379

配置RedisTemplate:

创建一个配置类,配置RedisTemplate的序列化器等。

@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 使用StringRedisSerializer序列化keytemplate.setKeySerializer(new StringRedisSerializer());// 使用Jackson2JsonRedisSerializer序列化valuetemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));// 设置hash的key和value序列化方式template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));// 启用事务template.setEnableTransactionSupport(true);return template;}
}

使用RedisTemplate进行队列操作:

注入RedisTemplate并使用它进行队列操作。

@Service
public class RedisQueueService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;public void enqueue(String queueName, Object value) {redisTemplate.opsForList().rightPush(queueName, value);}public Object dequeue(String queueName) {return redisTemplate.opsForList().leftPop(queueName);}// 其他队列操作...
}

这是一个基本的配置和使用RedisTemplate实现队列操作的流程。
根据你的具体需求,可能还需要配置连接池、密码认证、集群支持等高级特性。此外,对于延时队列和异步队列,可能需要使用Redisson或其他高级特性和库。

实现异步队列

在Spring Boot中,使用Redis实现异步队列通常可以通过发布/订阅模式或列表(List)数据结构来完成。以下是两种实现方式的示例:

使用发布/订阅模式实现异步队列

发布/订阅模式是一种消息通信模式,其中消息生产者(发布者)不会将消息直接发送到特定的接收者(订阅者),而是将消息发布到一个主题。

对这些消息感兴趣的接收者可以订阅这个主题,从而异步接收消息。

配置RedisTemplate:

首先,在Spring Boot应用中配置RedisTemplate,以便于操作Redis。

创建消息生产者:

使用RedisTemplate的convertAndSend方法发布消息到指定的频道。

@Service
public class MessageProducer {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void sendMessage(String channel, String message) {redisTemplate.convertAndSend(channel, message);}
}

创建消息消费者:

创建一个消息监听器,订阅频道并接收消息。

@Service
public class MessageListener {@RabbitListener(queues = "queue.name")public void receiveMessage(String message) {// 处理接收到的消息System.out.println("Received: " + message);}
}

使用列表(List)数据结构实现异步队列

列表是一种双向链表结构,可以作为队列使用。

生产者可以使用lpush或rpush将消息添加到列表的头部或尾部,消费者可以使用lpop或rpop从列表的头部或尾部取出消息。

配置RedisTemplate:

同样,首先需要配置RedisTemplate。

生产者添加消息到列表:

使用RedisTemplate的opsForList操作列表,将消息压入列表。

@Service
public class AsyncQueueProducer {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void pushMessage(String listKey, String message) {redisTemplate.opsForList().rightPush(listKey, message);}
}

消费者从列表中取出消息:

消费者可以从列表中弹出消息并处理。

@Service
public class AsyncQueueConsumer {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void consumeMessage(String listKey) {List<String> messages = redisTemplate.opsForList().range(listKey, 0, -1);for (String message : messages) {// 处理消息System.out.println("Processing: " + message);}}
}

在实际应用中,可以根据业务需求选择使用发布/订阅模式或列表数据结构来实现异步队列。
发布/订阅模式适用于消息广播的场景,而列表数据结构更适用于实现一个简单的任务队列

实现延时队列

在Spring Boot中使用Redis实现延时队列,可以通过Sorted Set数据结构来实现。以下是具体的实现步骤和示例代码:

步骤 1: 添加Redis依赖

首先,确保你的pom.xml文件中已经添加了Spring Boot的Redis依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

步骤 2: 配置Redis

在application.properties或application.yml中配置Redis服务器的连接信息:

application.properties

spring.redis.host=localhost
spring.redis.port=6379

步骤 3: 创建延时队列服务

创建一个服务类来封装延时队列的逻辑:

@Service
public class DelayedQueueService {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void enqueue(String queueName, String message, long delaySeconds) {double score = System.currentTimeMillis() + delaySeconds * 1000;redisTemplate.opsForZSet().add(queueName, message, score);}public String dequeue(String queueName) {Set<String> keys = redisTemplate.opsForZSet().rangeByScore(queueName, 0, System.currentTimeMillis());if (!keys.isEmpty()) {String message = keys.iterator().next();redisTemplate.opsForZSet().remove(queueName, message);return message;}return null;}
}

步骤 4: 使用延时队列

在业务逻辑中使用DelayedQueueService来添加和消费延时消息:

@RestController
public class DelayedQueueController {@Autowiredprivate DelayedQueueService delayedQueueService;@PostMapping("/delayed-enqueue")public ResponseEntity<?> enqueueDelayedMessage(@RequestParam String message, @RequestParam long delaySeconds) {String queueName = "delayedQueue";delayedQueueService.enqueue(queueName, message, delaySeconds);return ResponseEntity.ok("Message enqueued with delay " + delaySeconds + " seconds");}@GetMapping("/delayed-dequeue")public ResponseEntity<?> dequeueDelayedMessage() {String queueName = "delayedQueue";String message = delayedQueueService.dequeue(queueName);if (message != null) {return ResponseEntity.ok(message);} else {return ResponseEntity.noContent().build();}}
}

说明

enqueue方法将消息和其预定的延迟时间戳(当前时间 + 延迟时间)作为分数(score)添加到Sorted Set中。

dequeue方法检索分数小于或等于当前时间的所有消息,从Sorted Set中移除并返回第一个消息。

定时任务(如果有)可以用于定期处理到期的消息。

这个简单的实现提供了一个基础的延时队列功能,适用于需要异步处理但具有特定延迟时间的任务。对于更高级的消息队列需求,可能需要考虑使用专业的MQ系统。

Redis Stream实现持久化队列

添加依赖:

确保pom.xml文件中已添加Spring Boot的Redis依赖。

配置Redis:

在application.properties或application.yml中配置Redis服务器的连接信息。
创建消息生产者:使用RedisTemplate的opsForStream()方法添加消息到Stream中。

创建消费者组:

使用XGROUP CREATE命令为Stream创建消费者组,可以指定从哪个消息ID开始消费。

配置消费者:

实现StreamListener接口,编写接收消息的逻辑。

配置Stream监听器容器:

使用StreamMessageListenerContainer来管理消费者,监听特定Stream的消息,并分配给消费者处理。

示例代码如下:

@Service
public class MessageProducer {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void sendMessage(String streamKey, Map<String, String> message) {redisTemplate.opsForStream().add(streamKey, message);}
}
@Service
public class MessageConsumer implements StreamListener<String, ObjectRecord<String, String>> {@Overridepublic void onMessage(ObjectRecord<String, String> message) {// 处理接收到的消息}
}
@Configuration
public class RedisStreamConfig {@Beanpublic StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) {StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =new StreamMessageListenerContainer<>(connectionFactory, messageConsumer);container.start();return container;}
}

在上述代码中,MessageProducer用于发送消息到Redis
Stream,MessageConsumer实现了StreamListener接口来接收消息。
RedisStreamConfig配置了StreamMessageListenerContainer,它会启动并监听消息,将收到的消息分发给消费者处理。

注意,为了确保消息的可靠性,可以实现消息确认机制,在消息被成功处理后,通过XACK命令向Redis确认消息已被消费。同时,如果需要处理消息的持久化和回溯,Redis Stream提供了相应的命令来查询历史消息或未确认的消息 。

通过这种方式,Spring Boot应用可以利用Redis Stream构建一个高性能、持久化且支持消费者组的队列系统,适用于多种消息队列场景 。

相关文章:

详解 Redis 队列 实现

Redis 是一个高性能的键值存储系统&#xff0c;它的多种数据结构使其能够以不同方式实现队列&#xff0c;包括普通队列、延时队列和异步队列的介绍和示例。 介绍 Redis 的 List 数据结构可以用来实现普通的队列。 生产者使用 LPUSH 或 RPUSH 命令将消息添加到列表的头部或尾部…...

分析SQL的count(*)并优化

最近优化过几个慢查询接口的性能&#xff0c;总结了一些心得体会拿出来跟大家一起分享一下&#xff0c;希望对你会有所帮助。 我们使用的数据库是Mysql8&#xff0c;使用的存储引擎是Innodb。这次优化除了优化索引之外&#xff0c;更多的是在优化count(*)。 通常情况下&#…...

Java学习日记(day18)

一、软件的结构 C/S (Client - Server 客户端-服务器端) 典型应用&#xff1a;QQ软件 &#xff0c;飞秋&#xff0c;印象笔记。 特点&#xff1a; 必须下载特定的客户端程序。服务器端升级&#xff0c;客户端升级。 B/S &#xff08;Broswer -Server 浏览器端- 服务器端&a…...

Oracle(61)什么是外部表(External Table)?

外部表&#xff08;External Table&#xff09;是Oracle数据库中的一种特殊表类型&#xff0c;用于访问存储在外部文件系统中的数据&#xff0c;而不需要将数据实际加载到数据库内部。外部表的主要优势在于允许数据库用户在不移动或复制数据的情况下&#xff0c;直接查询和处理…...

物联网HMI/网关搭载ARM+CODESYS实现软PLC+HMI一体化

物联网HMI/网关搭载CODESYS实现软PLCHMI一体化 硬件&#xff1a;ARM平台&#xff0c;支持STM32/全志T3/RK3568/树莓派等平台 软件&#xff1a;CODESYS V3.5、JMobile Studio CODESYS是一款功能强大的PLC软件编程工具&#xff0c;它支持IEC61131-3标准IL、ST、FBD、LD、CFC、…...

Java中Stream流

Java中Stream流 Stream 使用flatMap处理嵌套集合: 有一个对象列表&#xff0c;每个对象又包含一个列表&#xff0c;可以使用flatMap来“展平”这个结构。 List<List<String>> listOfLists Arrays.asList(Arrays.asList("a", "b"),Arrays.a…...

纯css实现多行文本右下角最后一行展示全部按钮

未展开全部&#xff1a; 展开全部&#xff1a; 综上演示按钮始终保持在最下方 css代码如下&#xff1a; <div class"info-content"><div class"info-text" :class"!showAll ? mle-hidden : "><span class"show-all"…...

WPF篇(17)-ListBox列表控件+ListView数据列表控件

ListBox列表控件 ListBox是一个列表控件&#xff0c;用于显示条目类的数据&#xff0c;默认每行只能显示一个内容项&#xff0c;当然&#xff0c;我们可以通过修改它的数据模板&#xff0c;来自定义每一行&#xff08;元素&#xff09;的数据外观&#xff0c;达到显示更多数据…...

HAProxy 全解析:驾驭网络负载均衡与高可用的强大引擎

一、什么是HAproxy HAProxy是一个免费、开源的高性能TCP/HTTP负载均衡器和代理服务器软件&#xff0c;主要用于实现以下功能 一、负载均衡 多种负载均衡算法支持&#xff1a; 轮询&#xff08;Round Robin&#xff09;&#xff1a;它依次将请求均匀分配到后端的各个服务器。例…...

陶瓷材质的防静电架空地板越来越受欢迎的原因

目前市面上的陶瓷防静电架空地板主要分为两种&#xff1a;钢基和硫酸钙基。前者是以全钢冲孔裸板作为板基&#xff0c;经粘接、固定整型和灌浆的方式加工而成&#xff0c;后者是以复合硫酸钙板为基材&#xff0c;表面粘接防静电陶瓷砖&#xff0c;四周导电PVC边条封边。近年来陶…...

Mariadb数据库本机无密码登录的问题解决

Mariadb数据库本机无密码登录的问题解决 安装了mariadb后&#xff0c;发现Mariadb本机无密码才能登录 百度了很多文章&#xff0c;发现很多人是因为root的plugin设置的值不正确导致的&#xff0c;unix_socket可以不需要密码&#xff0c;mysql_native_password 是正常的。 解…...

校园外卖平台小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;商家管理&#xff0c;菜品信息管理&#xff0c;菜品分类管理&#xff0c;购买菜品管理&#xff0c;订单信息管理&#xff0c;系统管理 微信端账号功能包括&#xff1a;系统首页&a…...

Python3 第八十一课 -- urllib

目录 一. 前言 二. urllib.request 三. urllib.error 四. urllib.parse 五. urllib.robotparser 一. 前言 Python urllib 库用于操作网页 URL&#xff0c;并对网页的内容进行抓取处理。 本文主要介绍 Python3 的 urllib。 urllib 包 包含以下几个模块&#xff1a; url…...

Vue 3+Vite+Eectron从入门到实战系列之(五)一后台管理登录页

前面已经讲了不少基础知识&#xff0c;这篇开始&#xff0c;我们进行实操&#xff0c;做个后台管理系统&#xff0c;打包成多端的,可安装的桌面app!!其中&#xff0c;登录&#xff0c;退出的提示信息用系统的提示&#xff0c;不使用elemengplus的弹窗提示&#xff01;&#xff…...

Docker 网络代理配置及防火墙设置指南

Docker 网络代理配置及防火墙设置指南 背景 在某些环境中&#xff0c;服务器无法直接访问外网&#xff0c;需要通过网络代理进行连接。虽然我们通常会在 /etc/environment 或 /etc/profile 等系统配置文件中直接配置代理&#xff0c;但 Docker 命令无法使用这些配置。例如&am…...

基于PostGIS(Postgres)+Node.js实现的xyz瓦片地图服务器

背景介绍 前两天研究GeoServer发布存储在PostGIS中栅格数据&#xff0c;最终目的是想在PostGIS中存储金字塔瓦片&#xff0c;用GeoServer发布&#xff0c;但是最后经过研究不改GeoServer源码的情况下&#xff0c;好像只支持将大图tif存在PostGIS数据库中进行发布&#xff0c;金…...

浙大数据结构慕课课后题(06-图3 六度空间)

题目要求&#xff1a; 输入格式: 输入第1行给出两个正整数&#xff0c;分别表示社交网络图的结点数N&#xff08;1<N≤103&#xff0c;表示人数&#xff09;、边数M&#xff08;≤33N&#xff0c;表示社交关系数&#xff09;。随后的M行对应M条边&#xff0c;每行给出一对正…...

Windows File Recovery卡在99%怎么解决?实用指南!

为什么会出现“Windows File Recovery卡在99%”的问题&#xff1f; Windows File Recovery&#xff08;Windows文件恢复&#xff09;是微软设计的命令行应用程序。它可以帮助用户从健康/损坏/格式化的存储设备中恢复已删除/丢失的文件。 通过输入相关命令&#xff0c;设置源/…...

数据结构之数组

写在前面 看下数组。 1&#xff1a;巴拉巴拉 数组是一种线性数据结构&#xff0c;使用连续的内存空间来存储数据&#xff0c;存储的数据要求有相同的数据类型&#xff0c;并且每个元素占用的内存空间相同。获取元素速度非常快&#xff0c;为O(1)常量时间复杂度&#xff0c;所…...

springboot集成sensitive-word实现敏感词过滤

文章目录 敏感词过滤方案一&#xff1a;正则表达式方案二&#xff1a;基于DFA算法的敏感词过滤工具框架-sensitive-wordspringboot集成sensitive-word步骤一&#xff1a;引入pom步骤二&#xff1a;自定义配置步骤三&#xff1a;自定义敏感词白名单步骤四&#xff1a;核心方法测…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战

前言 现在我们有个如下的需求&#xff0c;设计一个邮件发奖的小系统&#xff0c; 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其…...

Unity3D中Gfx.WaitForPresent优化方案

前言 在Unity中&#xff0c;Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染&#xff08;即CPU被阻塞&#xff09;&#xff0c;这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案&#xff1a; 对惹&#xff0c;这里有一个游戏开发交流小组&…...

(二)原型模式

原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...

Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级

在互联网的快速发展中&#xff0c;高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司&#xff0c;近期做出了一个重大技术决策&#xff1a;弃用长期使用的 Nginx&#xff0c;转而采用其内部开发…...

Spring Boot面试题精选汇总

&#x1f91f;致敬读者 &#x1f7e9;感谢阅读&#x1f7e6;笑口常开&#x1f7ea;生日快乐⬛早点睡觉 &#x1f4d8;博主相关 &#x1f7e7;博主信息&#x1f7e8;博客首页&#x1f7eb;专栏推荐&#x1f7e5;活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...

android13 app的触摸问题定位分析流程

一、知识点 一般来说,触摸问题都是app层面出问题,我们可以在ViewRootImpl.java添加log的方式定位;如果是touchableRegion的计算问题,就会相对比较麻烦了,需要通过adb shell dumpsys input > input.log指令,且通过打印堆栈的方式,逐步定位问题,并找到修改方案。 问题…...

Ubuntu Cursor升级成v1.0

0. 当前版本低 使用当前 Cursor v0.50时 GitHub Copilot Chat 打不开&#xff0c;快捷键也不好用&#xff0c;当看到 Cursor 升级后&#xff0c;还是蛮高兴的 1. 下载 Cursor 下载地址&#xff1a;https://www.cursor.com/cn/downloads 点击下载 Linux (x64) &#xff0c;…...

Ubuntu系统多网卡多相机IP设置方法

目录 1、硬件情况 2、如何设置网卡和相机IP 2.1 万兆网卡连接交换机&#xff0c;交换机再连相机 2.1.1 网卡设置 2.1.2 相机设置 2.3 万兆网卡直连相机 1、硬件情况 2个网卡n个相机 电脑系统信息&#xff0c;系统版本&#xff1a;Ubuntu22.04.5 LTS&#xff1b;内核版本…...

第八部分:阶段项目 6:构建 React 前端应用

现在&#xff0c;是时候将你学到的 React 基础知识付诸实践&#xff0c;构建一个简单的前端应用来模拟与后端 API 的交互了。在这个阶段&#xff0c;你可以先使用模拟数据&#xff0c;或者如果你的后端 API&#xff08;阶段项目 5&#xff09;已经搭建好&#xff0c;可以直接连…...

Monorepo架构: Nx Cloud 扩展能力与缓存加速

借助 Nx Cloud 实现项目协同与加速构建 1 &#xff09; 缓存工作原理分析 在了解了本地缓存和远程缓存之后&#xff0c;我们来探究缓存是如何工作的。以计算文件的哈希串为例&#xff0c;若后续运行任务时文件哈希串未变&#xff0c;系统会直接使用对应的输出和制品文件。 2 …...