当前位置: 首页 > news >正文

redis stream restTemplate消息监听队列框架搭建

整体思路

        1. pom增加redis依赖;

        2. 消息监听器,实现StreamListener接口,处理消息到达逻辑;

        3. 将消息订阅bean及监听器注册到配置中;

1. pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.6</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

2. 消息监听器实现代码

package cn.thuniwhir.fileserver.redis;import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @Description: TODO**/
@Component
public class RedisMQListener implements StreamListener<String, MapRecord<String, String, Object>> {private static final Logger log = LoggerFactory.getLogger(RedisMQListener.class);// 创建一个线程池private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());@Overridepublic void onMessage(MapRecord message) {// 异步处理消息threadPoolExecutor.execute(()->{System.out.println(Thread.currentThread().getName() + ":接收到的消息:" + message.getId() + ";" + JSON.toJSONString(message.getValue()));});}
}

3. redis订阅bean及监听器注册

package cn.thuniwhir.fileserver.redis;import cn.thuniwhir.fileserver.context.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;import java.time.Duration;
import java.util.stream.Collectors;/*** @Description: TODO**/
@Configuration
public class RedisMQConfig {@Autowiredprivate RedisMQListener redisMQListener;@Autowiredprivate RedisUtils redisUtils;private static RedisTemplate<Object, Object> redisTemplate;private static final Logger log = LoggerFactory.getLogger(RedisMQConfig.class);public RedisMQConfig(RedisTemplate<Object, Object> redisTemplate) {this.redisTemplate = redisTemplate;}@Beanpublic Subscription subscription(RedisConnectionFactory redisConnectionFactory) {if (redisUtils.hasKey(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME)) {StreamInfo.XInfoGroups xInfoGroups = redisTemplate.opsForStream().groups(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME);if (xInfoGroups.isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);} else {if (xInfoGroups.stream().filter(xInfoGroups1 -> xInfoGroups1.groupName().equals(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME)).collect(Collectors.toList()).isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}}} else {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).build();StreamMessageListenerContainer streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);Subscription subscription = streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME, Constants.FILE_MQ_DISK_THRESHOLD_CONSUMER), StreamOffset.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, ReadOffset.lastConsumed()), redisMQListener);streamMessageListenerContainer.start();return subscription;}}

4. 测试生产消息 消息监听成功

4.1 生产消息

@RequestMapping("/produceMessage")public JSONObject produceMessage(@RequestBody JSONObject jsonObject) {String key = jsonObject.getString("key");String value = jsonObject.getString("value");MapRecord<Object, String, Object> mapRecord = MapRecord.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Collections.singletonMap(key, value));redisTemplate.opsForStream().add(mapRecord);System.out.println("produceMessage Thread Name:" + Thread.currentThread().getName());return formatResult(null);}

4.2 消息监听器监听消息到达 代码见第二节

4.3 测试结果

相关文章:

redis stream restTemplate消息监听队列框架搭建

整体思路 1. pom增加redis依赖&#xff1b; 2. 消息监听器&#xff0c;实现StreamListener接口&#xff0c;处理消息到达逻辑&#xff1b; 3. 将消息订阅bean及监听器注册到配置中&#xff1b; 1. pom <?xml version"1.0" encoding"UTF-8"?> <…...

【期末不挂科-C++考前速过系列P1】大二C++第1次过程考核(3道简述题&7道代码题)【解析,注释】

前言 大家好吖&#xff0c;欢迎来到 YY 滴C复习系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过C的老铁 主要内容含&#xff1a; 欢迎订阅 YY滴C专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; YY的《C》专栏YY的《C11》专栏YY的《Lin…...

游戏开发中,你的游戏图片压缩格式使用ASTC了吗

文章目录 ASTC原理&#xff1a;使用要求 ASTC&#xff08;Adaptive Scalable Texture Compression&#xff0c;自适应可伸缩纹理压缩&#xff09;是一种高级的纹理压缩技术&#xff0c;由ARM公司开发并推广。它在图形处理领域中因其出色的压缩效率和灵活性而受到广泛关注。 AST…...

【PostgreSQL】数据查询-概述

PostgreSQL数据查询 概述 检索或从数据库中检索数据的命令的过程称为查询。在 SQL 中&#xff0c;SELECT 命令用于指定查询。该命令的一般语法是SELECT [WITH with_queries] SELECT select_list FROM table_expression [sort_specification]一种简单的查询形式为&#xff1a…...

element input组件自动失去焦点问题解决

最近在 Vue3 ElementPlus 中&#xff0c;使用 el-input 组件时&#xff0c;如果设置了 v-model&#xff0c;那么在每次改变内容后后&#xff0c;input 会自动失去焦点&#xff0c;这样会导致用户无法输入多个字符。 一、问题原因 如上图所示&#xff0c;配置项的 Name 和 Cod…...

鸿蒙Harmony--状态管理器-@Observed装饰器和@ObjectLink装饰器详解

经历的越多&#xff0c;越喜欢简单的生活&#xff0c;干净的东西&#xff0c;清楚的感觉&#xff0c;有结果的事&#xff0c;和说到做到的人。把圈子变小&#xff0c;把语放缓&#xff0c;把心放宽&#xff0c;用心做好手边的事儿&#xff0c;该有的总会有的! 目录 一&#xff…...

pytorch安装

pytoch安装 1. 准备工作1.1 需要提前安装的软件 2. 安装pyTorch我遇到的问题 3. 显卡测试4. CPU与GPU切换方法4.1 创建张量4.2 第一种切换方法4.3 第二种切换方法 1. 准备工作 1.1 需要提前安装的软件 Anaconda 史上最全最详细的Anaconda安装教程CUDA CUDA安装教程&#xff0…...

GBASE南大通用系统目录表

系统目录由描述数据库结构的表和视图组成。这些表对象有时称为数据字典&#xff0c;它们包含 数据库本身的所有信息。每个系统目录表都包含有关数据库中特定元素的信息。每个数据 库都有它自己的系统目录。 这些主题提供了有关系统目录表的结构、内容和使用的信息。还包含了有关…...

RPCMS跨站脚本漏洞(xss)

CNVD-ID: CNVD-2024-01190 漏洞描述: RPCMS是一个应用软件&#xff0c;一个网站CMS系统。 RPCMS v3.5.5版本存在跨站脚本漏洞&#xff0c;该漏洞源于组件/logs/dopost.html中对用户提供的数据缺乏有效过滤与转义&#xff0c;攻击者可利用该漏洞通过注入精心设计的有效载荷执行…...

Linux进阶命令使用

在 Linux 中&#xff0c;除了常用的基础命令&#xff0c;有一系列进阶命令可以帮助用户更有效地管理系统和执行复杂的任务。以下是一些常见的 Linux 进阶命令及其用法&#xff1a; 文本处理 grep&#xff1a;搜索文本并打印匹配的行。 grep pattern filenameawk&#xff1a;用…...

重定位,进程的创建,线程相关

重定位 进程的重定位指将程序加载到内存中不同的位置执行&#xff0c;在进程换出换入过程中将会发生。通过更新程序中使用的相对地址。 进程的创建——fork&#xff08;&#xff09; 进程树&#xff0c;在自己的节点下创建进程节点。 使用fork&#xff0c;创建的子进程是父进…...

Java填充Execl模板并返回前端下载

功能&#xff1a;后端使用Java POI填充Execl模板&#xff0c;并返回前端下载 Execl模板如下&#xff1a; 1. Java后端 功能&#xff1a;填充模板EXECL,并返回前端 controller层 package org.huan.controller;import org.huan.dto.ExcelData; import org.huan.util.ExcelT…...

ChatGPT本地部署,学习记录

一、GPT4ALL模型 官网地址&#xff1a; Github&#xff1a;https://github.com/nomic-ai/gpt4all GPT4ALL项目部署简易&#xff0c;但是在运行体验上一般&#xff0c;并且是只调用CPU来进行运算。 看官方文档介绍在嵌入式上有比较大的优势&#xff0c;但是目前个人对嵌入式…...

Find My游戏手柄|苹果Find My技术与手柄结合,智能防丢,全球定位

游戏手柄是一种常见电子游戏机的部件&#xff0c;通过操纵其按钮等&#xff0c;实现对游戏虚拟角色的控制。随着游戏设备硬件的升级换代&#xff0c;现代游戏手柄又增加了&#xff1a;类比摇杆&#xff08;方向及视角&#xff09;&#xff0c;扳机键以及HOME菜单键等。现在的游…...

2024美赛数学建模思路 - 复盘:光照强度计算的优化模型

文章目录 0 赛题思路1 问题要求2 假设约定3 符号约定4 建立模型5 模型求解6 实现代码 建模资料 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 问题要求 现在已知一个教室长为15米&#xff0c;宽为12米&…...

【Deep Dive: AI Webinar】开放 ChatGPT - 人工智能开放性运作的案例研究

【深入探讨人工智能】网络研讨系列总共有 17 个视频。我们按照视频内容&#xff0c;大致上分成了 3 个大类&#xff1a; 1. 人工智能的开放、风险与挑战&#xff08;4 篇&#xff09; 2. 人工智能的治理&#xff08;总共 12 篇&#xff09;&#xff0c;其中分成了几个子类&…...

Devops相关问题及答案(2024)

1、DevOps 的理念是什么&#xff1f; DevOps是一种组织文化、流程和工具的集合&#xff0c;旨在提高软件交付的速度和质量&#xff0c;通过自动化和持续改进的方法来促进开发&#xff08;Dev&#xff09;和运维&#xff08;Ops&#xff09;的协作。 DevOps的核心理念包括&…...

掌握Python设计模式,SQL Alchemy打破ORM与模型类的束缚

大家好&#xff0c;反转软件组件之间的依赖关系之所以重要&#xff0c;是因为它有助于降低耦合度和提高模块化程度&#xff0c;进而可以提高软件的可维护性、可扩展性和可测试性。 当组件之间紧密耦合时&#xff0c;对一个组件的更改可能会对其他组件产生意想不到的影响&#…...

性能分析与调优: Linux 磁盘I/O 观测工具

目录 一、实验 1.环境 2.iostat 3.sar 4.pidstat 5.perf 6. biolatency 7. biosnoop 8.iotop、biotop 9.blktrace 10.bpftrace 11.smartctl 二、问题 1.如何查看PSI数据 2.iotop如何安装 3.smartctl如何使用 一、实验 1.环境 &#xff08;1&#xff09;主机 …...

Could not erase files or folders:

IDEA删除 git 的 localChanges 内的文件时&#xff0c;提示Could not erase files or folders:。 确认下这个文件是否被打开&#xff0c;忘记关闭了&#xff1b;关闭后可以被删除。&#xff08;文件被打开的情况下&#xff0c;用操作系统自带的删除&#xff0c;也无法删除成功…...

2025年能源电力系统与流体力学国际会议 (EPSFD 2025)

2025年能源电力系统与流体力学国际会议&#xff08;EPSFD 2025&#xff09;将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会&#xff0c;EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...

基于Flask实现的医疗保险欺诈识别监测模型

基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施&#xff0c;由雇主和个人按一定比例缴纳保险费&#xff0c;建立社会医疗保险基金&#xff0c;支付雇员医疗费用的一种医疗保险制度&#xff0c; 它是促进社会文明和进步的…...

django filter 统计数量 按属性去重

在Django中&#xff0c;如果你想要根据某个属性对查询集进行去重并统计数量&#xff0c;你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求&#xff1a; 方法1&#xff1a;使用annotate()和Count 假设你有一个模型Item&#xff0c;并且你想…...

MMaDA: Multimodal Large Diffusion Language Models

CODE &#xff1a; https://github.com/Gen-Verse/MMaDA Abstract 我们介绍了一种新型的多模态扩散基础模型MMaDA&#xff0c;它被设计用于在文本推理、多模态理解和文本到图像生成等不同领域实现卓越的性能。该方法的特点是三个关键创新:(i) MMaDA采用统一的扩散架构&#xf…...

Java - Mysql数据类型对应

Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...

转转集团旗下首家二手多品类循环仓店“超级转转”开业

6月9日&#xff0c;国内领先的循环经济企业转转集团旗下首家二手多品类循环仓店“超级转转”正式开业。 转转集团创始人兼CEO黄炜、转转循环时尚发起人朱珠、转转集团COO兼红布林CEO胡伟琨、王府井集团副总裁祝捷等出席了开业剪彩仪式。 据「TMT星球」了解&#xff0c;“超级…...

Java多线程实现之Callable接口深度解析

Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...

GitHub 趋势日报 (2025年06月08日)

&#x1f4ca; 由 TrendForge 系统生成 | &#x1f310; https://trendforge.devlive.org/ &#x1f310; 本日报中的项目描述已自动翻译为中文 &#x1f4c8; 今日获星趋势图 今日获星趋势图 884 cognee 566 dify 414 HumanSystemOptimization 414 omni-tools 321 note-gen …...

【笔记】WSL 中 Rust 安装与测试完整记录

#工作记录 WSL 中 Rust 安装与测试完整记录 1. 运行环境 系统&#xff1a;Ubuntu 24.04 LTS (WSL2)架构&#xff1a;x86_64 (GNU/Linux)Rust 版本&#xff1a;rustc 1.87.0 (2025-05-09)Cargo 版本&#xff1a;cargo 1.87.0 (2025-05-06) 2. 安装 Rust 2.1 使用 Rust 官方安…...

【JVM面试篇】高频八股汇总——类加载和类加载器

目录 1. 讲一下类加载过程&#xff1f; 2. Java创建对象的过程&#xff1f; 3. 对象的生命周期&#xff1f; 4. 类加载器有哪些&#xff1f; 5. 双亲委派模型的作用&#xff08;好处&#xff09;&#xff1f; 6. 讲一下类的加载和双亲委派原则&#xff1f; 7. 双亲委派模…...