redis stream restTemplate消息监听队列框架搭建
整体思路
1. pom增加redis依赖;
2. 消息监听器,实现StreamListener接口,处理消息到达逻辑;
3. 将消息订阅bean及监听器注册到配置中;
1. pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.6</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
2. 消息监听器实现代码
package cn.thuniwhir.fileserver.redis;import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @Description: TODO**/
@Component
public class RedisMQListener implements StreamListener<String, MapRecord<String, String, Object>> {private static final Logger log = LoggerFactory.getLogger(RedisMQListener.class);// 创建一个线程池private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());@Overridepublic void onMessage(MapRecord message) {// 异步处理消息threadPoolExecutor.execute(()->{System.out.println(Thread.currentThread().getName() + ":接收到的消息:" + message.getId() + ";" + JSON.toJSONString(message.getValue()));});}
}
3. redis订阅bean及监听器注册
package cn.thuniwhir.fileserver.redis;import cn.thuniwhir.fileserver.context.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;import java.time.Duration;
import java.util.stream.Collectors;/*** @Description: TODO**/
@Configuration
public class RedisMQConfig {@Autowiredprivate RedisMQListener redisMQListener;@Autowiredprivate RedisUtils redisUtils;private static RedisTemplate<Object, Object> redisTemplate;private static final Logger log = LoggerFactory.getLogger(RedisMQConfig.class);public RedisMQConfig(RedisTemplate<Object, Object> redisTemplate) {this.redisTemplate = redisTemplate;}@Beanpublic Subscription subscription(RedisConnectionFactory redisConnectionFactory) {if (redisUtils.hasKey(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME)) {StreamInfo.XInfoGroups xInfoGroups = redisTemplate.opsForStream().groups(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME);if (xInfoGroups.isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);} else {if (xInfoGroups.stream().filter(xInfoGroups1 -> xInfoGroups1.groupName().equals(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME)).collect(Collectors.toList()).isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}}} else {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).build();StreamMessageListenerContainer streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);Subscription subscription = streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME, Constants.FILE_MQ_DISK_THRESHOLD_CONSUMER), StreamOffset.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, ReadOffset.lastConsumed()), redisMQListener);streamMessageListenerContainer.start();return subscription;}}
4. 测试生产消息 消息监听成功
4.1 生产消息
@RequestMapping("/produceMessage")public JSONObject produceMessage(@RequestBody JSONObject jsonObject) {String key = jsonObject.getString("key");String value = jsonObject.getString("value");MapRecord<Object, String, Object> mapRecord = MapRecord.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Collections.singletonMap(key, value));redisTemplate.opsForStream().add(mapRecord);System.out.println("produceMessage Thread Name:" + Thread.currentThread().getName());return formatResult(null);}
4.2 消息监听器监听消息到达 代码见第二节
4.3 测试结果


相关文章:
redis stream restTemplate消息监听队列框架搭建
整体思路 1. pom增加redis依赖; 2. 消息监听器,实现StreamListener接口,处理消息到达逻辑; 3. 将消息订阅bean及监听器注册到配置中; 1. pom <?xml version"1.0" encoding"UTF-8"?> <…...
【期末不挂科-C++考前速过系列P1】大二C++第1次过程考核(3道简述题&7道代码题)【解析,注释】
前言 大家好吖,欢迎来到 YY 滴C复习系列 ,热烈欢迎! 本章主要内容面向接触过C的老铁 主要内容含: 欢迎订阅 YY滴C专栏!更多干货持续更新!以下是传送门! YY的《C》专栏YY的《C11》专栏YY的《Lin…...
游戏开发中,你的游戏图片压缩格式使用ASTC了吗
文章目录 ASTC原理:使用要求 ASTC(Adaptive Scalable Texture Compression,自适应可伸缩纹理压缩)是一种高级的纹理压缩技术,由ARM公司开发并推广。它在图形处理领域中因其出色的压缩效率和灵活性而受到广泛关注。 AST…...
【PostgreSQL】数据查询-概述
PostgreSQL数据查询 概述 检索或从数据库中检索数据的命令的过程称为查询。在 SQL 中,SELECT 命令用于指定查询。该命令的一般语法是SELECT [WITH with_queries] SELECT select_list FROM table_expression [sort_specification]一种简单的查询形式为:…...
element input组件自动失去焦点问题解决
最近在 Vue3 ElementPlus 中,使用 el-input 组件时,如果设置了 v-model,那么在每次改变内容后后,input 会自动失去焦点,这样会导致用户无法输入多个字符。 一、问题原因 如上图所示,配置项的 Name 和 Cod…...
鸿蒙Harmony--状态管理器-@Observed装饰器和@ObjectLink装饰器详解
经历的越多,越喜欢简单的生活,干净的东西,清楚的感觉,有结果的事,和说到做到的人。把圈子变小,把语放缓,把心放宽,用心做好手边的事儿,该有的总会有的! 目录 一ÿ…...
pytorch安装
pytoch安装 1. 准备工作1.1 需要提前安装的软件 2. 安装pyTorch我遇到的问题 3. 显卡测试4. CPU与GPU切换方法4.1 创建张量4.2 第一种切换方法4.3 第二种切换方法 1. 准备工作 1.1 需要提前安装的软件 Anaconda 史上最全最详细的Anaconda安装教程CUDA CUDA安装教程࿰…...
GBASE南大通用系统目录表
系统目录由描述数据库结构的表和视图组成。这些表对象有时称为数据字典,它们包含 数据库本身的所有信息。每个系统目录表都包含有关数据库中特定元素的信息。每个数据 库都有它自己的系统目录。 这些主题提供了有关系统目录表的结构、内容和使用的信息。还包含了有关…...
RPCMS跨站脚本漏洞(xss)
CNVD-ID: CNVD-2024-01190 漏洞描述: RPCMS是一个应用软件,一个网站CMS系统。 RPCMS v3.5.5版本存在跨站脚本漏洞,该漏洞源于组件/logs/dopost.html中对用户提供的数据缺乏有效过滤与转义,攻击者可利用该漏洞通过注入精心设计的有效载荷执行…...
Linux进阶命令使用
在 Linux 中,除了常用的基础命令,有一系列进阶命令可以帮助用户更有效地管理系统和执行复杂的任务。以下是一些常见的 Linux 进阶命令及其用法: 文本处理 grep:搜索文本并打印匹配的行。 grep pattern filenameawk:用…...
重定位,进程的创建,线程相关
重定位 进程的重定位指将程序加载到内存中不同的位置执行,在进程换出换入过程中将会发生。通过更新程序中使用的相对地址。 进程的创建——fork() 进程树,在自己的节点下创建进程节点。 使用fork,创建的子进程是父进…...
Java填充Execl模板并返回前端下载
功能:后端使用Java POI填充Execl模板,并返回前端下载 Execl模板如下: 1. Java后端 功能:填充模板EXECL,并返回前端 controller层 package org.huan.controller;import org.huan.dto.ExcelData; import org.huan.util.ExcelT…...
ChatGPT本地部署,学习记录
一、GPT4ALL模型 官网地址: Github:https://github.com/nomic-ai/gpt4all GPT4ALL项目部署简易,但是在运行体验上一般,并且是只调用CPU来进行运算。 看官方文档介绍在嵌入式上有比较大的优势,但是目前个人对嵌入式…...
Find My游戏手柄|苹果Find My技术与手柄结合,智能防丢,全球定位
游戏手柄是一种常见电子游戏机的部件,通过操纵其按钮等,实现对游戏虚拟角色的控制。随着游戏设备硬件的升级换代,现代游戏手柄又增加了:类比摇杆(方向及视角),扳机键以及HOME菜单键等。现在的游…...
2024美赛数学建模思路 - 复盘:光照强度计算的优化模型
文章目录 0 赛题思路1 问题要求2 假设约定3 符号约定4 建立模型5 模型求解6 实现代码 建模资料 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 1 问题要求 现在已知一个教室长为15米,宽为12米&…...
【Deep Dive: AI Webinar】开放 ChatGPT - 人工智能开放性运作的案例研究
【深入探讨人工智能】网络研讨系列总共有 17 个视频。我们按照视频内容,大致上分成了 3 个大类: 1. 人工智能的开放、风险与挑战(4 篇) 2. 人工智能的治理(总共 12 篇),其中分成了几个子类&…...
Devops相关问题及答案(2024)
1、DevOps 的理念是什么? DevOps是一种组织文化、流程和工具的集合,旨在提高软件交付的速度和质量,通过自动化和持续改进的方法来促进开发(Dev)和运维(Ops)的协作。 DevOps的核心理念包括&…...
掌握Python设计模式,SQL Alchemy打破ORM与模型类的束缚
大家好,反转软件组件之间的依赖关系之所以重要,是因为它有助于降低耦合度和提高模块化程度,进而可以提高软件的可维护性、可扩展性和可测试性。 当组件之间紧密耦合时,对一个组件的更改可能会对其他组件产生意想不到的影响&#…...
性能分析与调优: Linux 磁盘I/O 观测工具
目录 一、实验 1.环境 2.iostat 3.sar 4.pidstat 5.perf 6. biolatency 7. biosnoop 8.iotop、biotop 9.blktrace 10.bpftrace 11.smartctl 二、问题 1.如何查看PSI数据 2.iotop如何安装 3.smartctl如何使用 一、实验 1.环境 (1)主机 …...
Could not erase files or folders:
IDEA删除 git 的 localChanges 内的文件时,提示Could not erase files or folders:。 确认下这个文件是否被打开,忘记关闭了;关闭后可以被删除。(文件被打开的情况下,用操作系统自带的删除,也无法删除成功…...
浅谈 React Hooks
React Hooks 是 React 16.8 引入的一组 API,用于在函数组件中使用 state 和其他 React 特性(例如生命周期方法、context 等)。Hooks 通过简洁的函数接口,解决了状态与 UI 的高度解耦,通过函数式编程范式实现更灵活 Rea…...
【Axure高保真原型】引导弹窗
今天和大家中分享引导弹窗的原型模板,载入页面后,会显示引导弹窗,适用于引导用户使用页面,点击完成后,会显示下一个引导弹窗,直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...
在软件开发中正确使用MySQL日期时间类型的深度解析
在日常软件开发场景中,时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志,到供应链系统的物流节点时间戳,时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库,其日期时间类型的…...
k8s从入门到放弃之Ingress七层负载
k8s从入门到放弃之Ingress七层负载 在Kubernetes(简称K8s)中,Ingress是一个API对象,它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress,你可…...
STM32F4基本定时器使用和原理详解
STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
如何更改默认 Crontab 编辑器 ?
在 Linux 领域中,crontab 是您可能经常遇到的一个术语。这个实用程序在类 unix 操作系统上可用,用于调度在预定义时间和间隔自动执行的任务。这对管理员和高级用户非常有益,允许他们自动执行各种系统任务。 编辑 Crontab 文件通常使用文本编…...
JavaScript 数据类型详解
JavaScript 数据类型详解 JavaScript 数据类型分为 原始类型(Primitive) 和 对象类型(Object) 两大类,共 8 种(ES11): 一、原始类型(7种) 1. undefined 定…...
uniapp 集成腾讯云 IM 富媒体消息(地理位置/文件)
UniApp 集成腾讯云 IM 富媒体消息全攻略(地理位置/文件) 一、功能实现原理 腾讯云 IM 通过 消息扩展机制 支持富媒体类型,核心实现方式: 标准消息类型:直接使用 SDK 内置类型(文件、图片等)自…...
32单片机——基本定时器
STM32F103有众多的定时器,其中包括2个基本定时器(TIM6和TIM7)、4个通用定时器(TIM2~TIM5)、2个高级控制定时器(TIM1和TIM8),这些定时器彼此完全独立,不共享任何资源 1、定…...
