当前位置: 首页 > news >正文

利用Redis的队列模式实现消息的发送和订阅,适合分布式场景,Java实现代码

在Redis中,通常使用发布/订阅模式(Pub/Sub)来进行消息的实时通信。然而,标准的Redis发布/订阅模式并不直接支持确保一条消息只被一台机器消费。在这种模式下,所有订阅了特定频道的客户端都会收到发布的消息。

 

但是,你可以通过一些策略或模式来模拟这种“只在一台机器上消费”的行为。以下是一些可能的方法:

 

1. 使用Redis的分布式锁

发布消息:当消息发布时,使用一个Redis的分布式锁(如RedLock)来确保只有一个消费者能够处理该消息。

处理消息:消费者尝试获取锁。如果成功,则处理消息并释放锁;如果失败,则放弃处理该消息。

2. 队列模式

发布消息:不是直接将消息发布到频道,而是将消息推送到一个Redis列表(List)或有序集合(Sorted Set)中。

消费消息:消费者使用BLPOP、BRPOP或其他阻塞操作从列表中拉取消息。由于这些操作是阻塞的,因此它们会等待直到有消息可用。同时,由于只有一个消费者能够成功地从列表中拉取消息,因此可以确保消息只被一台机器消费。

3. 分布式任务队列

使用更高级的分布式任务队列系统(如Celery、RabbitMQ、Kafka等),这些系统通常提供了更复杂的路由和消息确认机制,可以确保消息只被一台机器消费。

4. 自定义发布/订阅逻辑

在应用层实现自定义的发布/订阅逻辑。例如,你可以使用一个Redis哈希(Hash)来跟踪哪些消息已经被哪台机器消费。当发布消息时,检查哈希以确定是否有机器已经消费了该消息。如果没有,则选择一个消费者并将消息发送到其队列中。

注意事项:

在实现这些策略时,请确保考虑并发性和性能问题。例如,分布式锁和Redis列表操作在高并发环境下可能会成为瓶颈。

确保你的Redis实例是可靠和可扩展的。在分布式系统中,Redis实例可能会成为单点故障或性能瓶颈。考虑使用Redis集群或其他高可用性解决方案来避免这些问题。

根据你的具体需求和场景选择合适的策略。例如,如果你的系统需要处理大量实时消息并且需要确保消息的顺序性,那么使用Redis列表和阻塞操作可能是个好主意。但是,如果你的系统需要更复杂的路由和消息确认机制,那么使用分布式任务队列系统可能更合适。

 

下面介绍用队列模式,简单实现队列如下:

要使用Redis的队列模式实现消息的消费,我们可以使用Jedis库(Java的Redis客户端)来与Redis进行交互。以下是一个简单的Java实现,展示了如何使用Redis的List数据结构(通常使用LPUSH和BRPOP命令)来实现消息的发布和消费。

 

首先,确保你已经添加了Jedis的依赖到你的项目中。如果你使用Maven,可以在pom.xml中添加以下依赖:

 

xml

复制

<dependency>

    <groupId>redis.clients</groupId>

    <artifactId>jedis</artifactId>

    <version>3.7.0</version> <!-- 请检查并使用最新版本 -->

</dependency>

 

 

然后,你可以编写两个简单的Java类,一个用于发布消息(Producer),另一个用于消费消息(Consumer)。

 

Producer(消息发布者)

 

java

复制

import redis.clients.jedis.Jedis;

 

public class RedisMessageProducer {

 

    private final Jedis jedis;

    private final String queueName;

 

    public RedisMessageProducer(Jedis jedis, String queueName) {

        this.jedis = jedis;

        this.queueName = queueName;

    }

 

    public void produce(String message) {

        jedis.lpush(queueName, message);

        System.out.println("Produced: " + message);

    }

 

    public static void main(String[] args) {

        Jedis jedis = new Jedis("localhost"); // 假设Redis服务器运行在本地

        RedisMessageProducer producer = new RedisMessageProducer(jedis, "my-queue");

 

        // 发布消息

        producer.produce("Hello, Redis Queue!");

        producer.produce("Another message for the queue.");

 

        // 关闭连接(在实际应用中,你可能希望使用连接池来管理连接)

        jedis.close();

    }

}

 

 

Consumer(消息消费者)

 

java

复制

import redis.clients.jedis.Jedis;

 

public class RedisMessageConsumer implements Runnable {

 

    private final Jedis jedis;

    private final String queueName;

 

    public RedisMessageConsumer(Jedis jedis, String queueName) {

        this.jedis = jedis;

        this.queueName = queueName;

    }

 

    @Override

    public void run() {

        while (true) { // 无限循环,直到应用程序被终止

            String message = jedis.brpop(0, queueName).get(1); // 阻塞直到有消息可用

            if (message != null) {

                System.out.println("Consumed: " + message);

                // 在这里处理消息...

            }

        }

    }

 

    public static void main(String[] args) {

        Jedis jedis = new Jedis("localhost"); // 假设Redis服务器运行在本地

        RedisMessageConsumer consumer = new RedisMessageConsumer(jedis, "my-queue");

 

        // 在新的线程中运行消费者

        new Thread(consumer).start();

 

        // 注意:这里的main方法不会立即结束,因为消费者在一个无限循环中运行。

        // 在实际应用中,你可能希望以不同的方式管理消费者的生命周期。

    }

}

 

 

注意:

 

在这个例子中,RedisMessageConsumer的main方法启动了一个新线程来运行消费者。在实际应用中,你可能希望以更复杂的方式管理这些线程,例如使用线程池或Spring的@Async注解。

jedis.brpop(0, queueName)中的0表示阻塞的时间(以秒为单位)。传递0意味着它将无限期地阻塞,直到有消息可用。

请确保你的Redis服务器正在运行,并且Java应用程序可以访问它。如果Redis服务器不在本地运行,你需要将Jedis的构造函数中的"localhost"替换为Redis服务器的实际地址。

在实际应用中,你可能还需要处理异常、优雅地关闭连接以及确保在应用程序终止时正确地清理资源。

相关文章:

利用Redis的队列模式实现消息的发送和订阅,适合分布式场景,Java实现代码

在Redis中&#xff0c;通常使用发布/订阅模式&#xff08;Pub/Sub&#xff09;来进行消息的实时通信。然而&#xff0c;标准的Redis发布/订阅模式并不直接支持确保一条消息只被一台机器消费。在这种模式下&#xff0c;所有订阅了特定频道的客户端都会收到发布的消息。 但是&…...

软件下载安装【汇总】

软件下载安装【汇总】 前言版权推荐软件安装【汇总】最后 前言 2024-5-12 21:38:34 以下内容源自《【汇总】》 仅供学习交流使用 版权 禁止其他平台发布时删除以下此话 本文首次发布于CSDN平台 作者是CSDN日星月云 博客主页是https://jsss-1.blog.csdn.net 禁止其他平台发布…...

重定向文件访问(Redirect file access)

重定向文件访问 重定向文件访问是指通过修改文件系统的路径&#xff0c;使对某个文件或目录的访问请求被转到另一个文件或目录。这在系统管理、测试和开发中非常有用&#xff0c;因为它允许您在不修改应用程序或服务配置的情况下&#xff0c;改变文件的实际存储位置。 proot …...

隐私计算(1)数据可信流通

目录 1. 数据可信流通体系 2. 信任的基石 3.数据流通中的不可信风险 可信链条的级联失效&#xff0c;以至于崩塌 4.数据内循环与外循环&#xff1a;传统数据安全的信任基础 4.1内循环 4.2外循环 5. 技术信任 6. 密态计算 7.技术信任 7.1可信数字身份 7.2 使用权跨域…...

果汁机锂电池充电,5V升压12.7V 升压恒压芯片SL1571B

在现代化的日常生活中&#xff0c;果汁机已经逐渐成为了许多家庭厨房的必备电器。随着科技的不断进步&#xff0c;果汁机的性能也在不断提升&#xff0c;其中锂电池的应用更是为果汁机带来了前所未有的便利。而5V升压12.7V升压恒压芯片SL1571B&#xff0c;作为果汁机锂电池充电…...

多个线程多个锁:如何确保线程安全和避免竞争条件

目录 前言 一、确定需要多个锁的场景 1.独立资源保护 2.部分依赖资源 二、避免死锁 三、锁粒度与并发性能 1. 粗粒度锁定 2.细粒度锁定 四、设计策略&#xff1a;减少资源依赖 1.资源分离 2.无锁设计 3.锁合并 五、Demo讲解 总结&#xff1a; 前言 当多个线程需要…...

Linux-笔记 设备树插件

目录 前言&#xff1a; 设备树插件的书写规范&#xff1a; 设备树插件的编译&#xff1a; 内核配置: 应用背景&#xff1a; 举例&#xff1a; 前言&#xff1a; 设备树插件&#xff08;Device Tree Blob Overlay&#xff0c;简称 DTBO&#xff09;是Linux内核和嵌入式系统…...

【排序算法】总结篇

✨✨这些 排序算法都是指的 需要进行比较的排序算法 ✨✨下面都是略微讲解一下思路&#xff0c;如果需要详细了解哪一个排序&#xff0c;点击&#x1f449;链接即可 ✨✨对于时间、空间复杂度、稳定性&#xff0c;希望你&#x1f9d1;‍&#x1f393;能够理解记忆&#x1f9d1;…...

鸿蒙开发文件管理:【@ohos.fileio (文件管理)】

文件管理 该模块提供文件存储管理能力&#xff0c;包括文件基本管理、文件目录管理、文件信息统计、文件流式读写等常用功能。 说明&#xff1a; 本模块首批接口从API version 6开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。 导入模块 impor…...

硬件工程师学习规划

背景介绍 当前电子行业中&#xff0c;互联网因为中国人口基数大&#xff0c;得到很快的发展&#xff0c;一越成为世界第一梯队&#xff0c;互联网软件薪资要高于传统制造业硬件的薪资&#xff0c;从各大招聘软件上就能看到&#xff0c;那么为什么软件发展要好于硬件&#xff1…...

esp32 8行代码实现蓝牙音响

目录 硬件准备&#xff1a; 具体代码&#xff1a; 接线&#xff1a; 备注&#xff1a; 八行代码实现简易版蓝牙音响&#xff0c;亲测有效&#xff1a; esp32 DIY蓝牙音响_哔哩哔哩_bilibili 硬件准备&#xff1a; ESP32-wroom、MAX98357音频放大器模块、4欧3瓦小喇叭、杜…...

注册用户如何防止缓存穿透?

注册用户如何防止缓存穿透&#xff1f; 先说明用户注册为什么会发送缓存穿透&#xff1a;用户注册时&#xff0c;需要验证用户名是否已存在&#xff0c;先查缓存&#xff0c;没有再查数据库&#xff0c;还没有才验证通过。高并发的情况下就可能有大量用户同时注册&#xff0c;…...

Presto基础知识

Presto缓存 引入Presto缓存之前 BackgroundHiveSplitLoader 使用底层的文件系统直接进行数据的读写&#xff1b; 引入Presto缓存机制之后&#xff0c;底层的文件系统被被CachingFileSystem 代理一层 CachingFileSystem 有两个子类&#xff0c;根据你选用的底层缓存引擎的不同…...

Ajax + Easy Excel 通过Blob实现导出excel

前端代码 <!DOCTYPE html> <html><head><meta charset"utf-8"><title></title><script src"./js/jquery-3.6.0.min.js"></script></head><body><div><button onclick"exportF…...

Qt+qss动态属性改变控件状态切换的样式

先说点基础的吧&#xff0c;qt的样式实现&#xff0c;常见的主要有三种方式&#xff0c;分别为&#xff1a; 1.ui界面中右键样式表直接添加 2.代码中对控件设置样式setStyleSheet 3.外部预设好qss文件&#xff0c;代码中加载后设置样式 实际工作开发中&#xff0c;我推荐使用优…...

纷享销客安全体系:安全运维运营

安全运维运营(Security Operations,SecOps)是指在信息安全管理中负责监控、检测、响应和恢复安全事件的一系列运营活动。它旨在保护组织的信息系统和数据免受安全威胁和攻击的损害。 通过有效的安全运维运营&#xff0c;组织可以及时发现和应对安全威胁&#xff0c;减少安全事…...

富瀚微FH8322 ISP图像调试—BLC校正

1、简单介绍 目录 1、简单介绍 2、调试方法 3、输出结果 富瀚微平台调试有一段时间了&#xff0c;一直没有总结&#xff0c;我们调试ISP的时候&#xff0c;首先一步时确定好sensor的黑电平值&#xff0c;黑电平如果不准&#xff0c;则会影响到后面的颜色及对比度相关模块。…...

什么是大型语言模型 ?

引言 在本文[1]中&#xff0c;我们将从高层次概述大型语言模型 (LLM) 的具体含义。 背景 2023年11月&#xff0c;我偶然间听闻了OpenAI的开发者大会&#xff0c;这个大会展示了人工智能领域的革命性进展&#xff0c;让我深深着迷。怀着对这一领域的浓厚兴趣&#xff0c;我加入了…...

RocketMq详解:二、SpringBoot集成RocketMq

在上一章中我们对Rocket的基础知识、特性以及四大核心组件进行了详细的介绍&#xff0c;本章带着大家一起去在项目中具体的进行应用&#xff0c;并设计将其作为一个工具包只提供消息的分发服务和业务模块进行解耦 在进行本章的学习之前&#xff0c;需要确保你的可以正常启动和…...

【源码】二开版微盘交易系统/贵金属交易平台/微交易系统

二开版微盘交易系统/贵金属交易平台/微交易系统 一套二开前端UI得贵金属微交易系统&#xff0c;前端产品后台可任意更换 此系统框架不是以往的至尊的框架&#xff0c;系统完美运行&#xff0c;K线采用nodejs方式运行 K线结算都正常&#xff0c;附带教程 资源来源:https://www.…...

DroidRun:用自然语言指令重塑Android自动化体验

1. 当Android遇上自然语言&#xff1a;DroidRun如何重新定义自动化 还记得第一次用语音助手控制手机时的惊艳吗&#xff1f;说句话就能定闹钟、发消息&#xff0c;感觉像在演科幻片。但很快你就会发现&#xff0c;这些功能就像快餐店的固定套餐——只能点菜单上有的&#xff0c…...

Qwen3.5小尺寸模型开源,9B碾压GPT开源版,消费级显卡就能跑

AI圈又出大新闻了✨ 阿里通义千问3.5系列小尺寸模型正式亮相&#xff0c;直接打破“小模型能力弱”的固有认知&#xff0c;甚至实现了“以小胜大”的逆袭&#xff0c;本地部署门槛直接拉到平民级&#xff01; 先上核心干货——这次千问3.5一口气推出了4款小尺寸模型&#xff0c…...

s2-pro语音合成教程:支持数字/单位/英文缩写智能朗读技巧

s2-pro语音合成教程&#xff1a;支持数字/单位/英文缩写智能朗读技巧 1. 快速了解s2-pro语音合成 s2-pro是Fish Audio开源的专业级语音合成模型镜像&#xff0c;它能将文本转换为自然流畅的语音。这个工具特别适合需要语音播报、有声读物制作、视频配音等场景的用户。 与普通…...

HP-Socket技术债务管理成熟度提升计划:行动项与时间表

HP-Socket技术债务管理成熟度提升计划&#xff1a;行动项与时间表 【免费下载链接】HP-Socket High Performance TCP/UDP/HTTP Communication Component 项目地址: https://gitcode.com/gh_mirrors/hp/HP-Socket HP-Socket作为高性能TCP/UDP/HTTP通信组件&#xff0c;随…...

Wan2.1-umt5能力展示:模拟计算机组成原理教学问答

Wan2.1-umt5能力展示&#xff1a;模拟计算机组成原理教学问答 最近在尝试用大模型辅助教学&#xff0c;发现了一个挺有意思的镜像——Wan2.1-umt5。它不像常见的聊天模型&#xff0c;更像是一个专门为理解和生成专业内容设计的“专家”。我突发奇想&#xff0c;让它扮演了一回…...

RCLAMP0542T.TCT‌静电保护TVS 二极管阵列 SEMTECH 电子元器件IC 芯片

RCLAMP0542T.TCT‌ 是由 ‌SEMTECH‌ 公司推出的一款超低电容、双通道ESD&#xff08;静电放电&#xff09;保护 TVS 二极管阵列&#xff0c;具备0.45pF 超低电容、5A 浪涌承受能力和超小型 SLP1610P4T 封装&#xff0c;专为高速数据接口设计&#xff0c;广泛应用于通信设备、消…...

3步实现跨次元游戏模组管理:XXMI启动器的多游戏统一解决方案

3步实现跨次元游戏模组管理&#xff1a;XXMI启动器的多游戏统一解决方案 【免费下载链接】XXMI-Launcher Modding platform for GI, HSR, WW and ZZZ 项目地址: https://gitcode.com/gh_mirrors/xx/XXMI-Launcher 还在为《原神》《崩坏&#xff1a;星穹铁道》等多款二次…...

【CDA干货】三个部门三个营收数:1200 万、1150 万、1280 万?企业指标口径不一致,三步破局

财务部报的Q3营收是1200万&#xff0c;运营部那边却是1150万&#xff0c;更离谱的是CEO给投资人看的PPT上写着1280万。这种事儿听起来是不是很离谱&#xff1f;但实际上&#xff0c;数据对不上&#xff0c;这事儿太常见了。表面看是数字打架&#xff0c;实际上是人跟人较劲——…...

如何快速解锁原神60帧限制:免费开源工具终极指南

如何快速解锁原神60帧限制&#xff1a;免费开源工具终极指南 【免费下载链接】genshin-fps-unlock unlocks the 60 fps cap 项目地址: https://gitcode.com/gh_mirrors/ge/genshin-fps-unlock 想要在《原神》中体验120帧甚至更高帧率的流畅游戏画面吗&#xff1f;genshi…...

计算机毕业设计springboot基于java技术的计算机实训室管理系统的设计与实现 基于SpringBoot框架的高校实训室资源预约与信息化管理平台的设计与实现 实验室智能调度与实训过程管理系统

计算机毕业设计springboot基于java技术的计算机实训室管理系统的设计与实现k8svdqb1 &#xff08;配套有源码 程序 mysql数据库 论文&#xff09; 本套源码可以在文本联xi,先看具体系统功能演示视频领取&#xff0c;可分享源码参考。随着高校信息化建设的深入推进&#xff0c;传…...