redis stream restTemplate消息监听队列框架搭建
整体思路
1. pom增加redis依赖;
2. 消息监听器,实现StreamListener接口,处理消息到达逻辑;
3. 将消息订阅bean及监听器注册到配置中;
1. pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.6</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
2. 消息监听器实现代码
package cn.thuniwhir.fileserver.redis;import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @Description: TODO**/
@Component
public class RedisMQListener implements StreamListener<String, MapRecord<String, String, Object>> {private static final Logger log = LoggerFactory.getLogger(RedisMQListener.class);// 创建一个线程池private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());@Overridepublic void onMessage(MapRecord message) {// 异步处理消息threadPoolExecutor.execute(()->{System.out.println(Thread.currentThread().getName() + ":接收到的消息:" + message.getId() + ";" + JSON.toJSONString(message.getValue()));});}
}
3. redis订阅bean及监听器注册
package cn.thuniwhir.fileserver.redis;import cn.thuniwhir.fileserver.context.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;import java.time.Duration;
import java.util.stream.Collectors;/*** @Description: TODO**/
@Configuration
public class RedisMQConfig {@Autowiredprivate RedisMQListener redisMQListener;@Autowiredprivate RedisUtils redisUtils;private static RedisTemplate<Object, Object> redisTemplate;private static final Logger log = LoggerFactory.getLogger(RedisMQConfig.class);public RedisMQConfig(RedisTemplate<Object, Object> redisTemplate) {this.redisTemplate = redisTemplate;}@Beanpublic Subscription subscription(RedisConnectionFactory redisConnectionFactory) {if (redisUtils.hasKey(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME)) {StreamInfo.XInfoGroups xInfoGroups = redisTemplate.opsForStream().groups(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME);if (xInfoGroups.isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);} else {if (xInfoGroups.stream().filter(xInfoGroups1 -> xInfoGroups1.groupName().equals(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME)).collect(Collectors.toList()).isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}}} else {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).build();StreamMessageListenerContainer streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);Subscription subscription = streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME, Constants.FILE_MQ_DISK_THRESHOLD_CONSUMER), StreamOffset.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, ReadOffset.lastConsumed()), redisMQListener);streamMessageListenerContainer.start();return subscription;}}
4. 测试生产消息 消息监听成功
4.1 生产消息
@RequestMapping("/produceMessage")public JSONObject produceMessage(@RequestBody JSONObject jsonObject) {String key = jsonObject.getString("key");String value = jsonObject.getString("value");MapRecord<Object, String, Object> mapRecord = MapRecord.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Collections.singletonMap(key, value));redisTemplate.opsForStream().add(mapRecord);System.out.println("produceMessage Thread Name:" + Thread.currentThread().getName());return formatResult(null);}
4.2 消息监听器监听消息到达 代码见第二节
4.3 测试结果


相关文章:
redis stream restTemplate消息监听队列框架搭建
整体思路 1. pom增加redis依赖; 2. 消息监听器,实现StreamListener接口,处理消息到达逻辑; 3. 将消息订阅bean及监听器注册到配置中; 1. pom <?xml version"1.0" encoding"UTF-8"?> <…...
【期末不挂科-C++考前速过系列P1】大二C++第1次过程考核(3道简述题&7道代码题)【解析,注释】
前言 大家好吖,欢迎来到 YY 滴C复习系列 ,热烈欢迎! 本章主要内容面向接触过C的老铁 主要内容含: 欢迎订阅 YY滴C专栏!更多干货持续更新!以下是传送门! YY的《C》专栏YY的《C11》专栏YY的《Lin…...
游戏开发中,你的游戏图片压缩格式使用ASTC了吗
文章目录 ASTC原理:使用要求 ASTC(Adaptive Scalable Texture Compression,自适应可伸缩纹理压缩)是一种高级的纹理压缩技术,由ARM公司开发并推广。它在图形处理领域中因其出色的压缩效率和灵活性而受到广泛关注。 AST…...
【PostgreSQL】数据查询-概述
PostgreSQL数据查询 概述 检索或从数据库中检索数据的命令的过程称为查询。在 SQL 中,SELECT 命令用于指定查询。该命令的一般语法是SELECT [WITH with_queries] SELECT select_list FROM table_expression [sort_specification]一种简单的查询形式为:…...
element input组件自动失去焦点问题解决
最近在 Vue3 ElementPlus 中,使用 el-input 组件时,如果设置了 v-model,那么在每次改变内容后后,input 会自动失去焦点,这样会导致用户无法输入多个字符。 一、问题原因 如上图所示,配置项的 Name 和 Cod…...
鸿蒙Harmony--状态管理器-@Observed装饰器和@ObjectLink装饰器详解
经历的越多,越喜欢简单的生活,干净的东西,清楚的感觉,有结果的事,和说到做到的人。把圈子变小,把语放缓,把心放宽,用心做好手边的事儿,该有的总会有的! 目录 一ÿ…...
pytorch安装
pytoch安装 1. 准备工作1.1 需要提前安装的软件 2. 安装pyTorch我遇到的问题 3. 显卡测试4. CPU与GPU切换方法4.1 创建张量4.2 第一种切换方法4.3 第二种切换方法 1. 准备工作 1.1 需要提前安装的软件 Anaconda 史上最全最详细的Anaconda安装教程CUDA CUDA安装教程࿰…...
GBASE南大通用系统目录表
系统目录由描述数据库结构的表和视图组成。这些表对象有时称为数据字典,它们包含 数据库本身的所有信息。每个系统目录表都包含有关数据库中特定元素的信息。每个数据 库都有它自己的系统目录。 这些主题提供了有关系统目录表的结构、内容和使用的信息。还包含了有关…...
RPCMS跨站脚本漏洞(xss)
CNVD-ID: CNVD-2024-01190 漏洞描述: RPCMS是一个应用软件,一个网站CMS系统。 RPCMS v3.5.5版本存在跨站脚本漏洞,该漏洞源于组件/logs/dopost.html中对用户提供的数据缺乏有效过滤与转义,攻击者可利用该漏洞通过注入精心设计的有效载荷执行…...
Linux进阶命令使用
在 Linux 中,除了常用的基础命令,有一系列进阶命令可以帮助用户更有效地管理系统和执行复杂的任务。以下是一些常见的 Linux 进阶命令及其用法: 文本处理 grep:搜索文本并打印匹配的行。 grep pattern filenameawk:用…...
重定位,进程的创建,线程相关
重定位 进程的重定位指将程序加载到内存中不同的位置执行,在进程换出换入过程中将会发生。通过更新程序中使用的相对地址。 进程的创建——fork() 进程树,在自己的节点下创建进程节点。 使用fork,创建的子进程是父进…...
Java填充Execl模板并返回前端下载
功能:后端使用Java POI填充Execl模板,并返回前端下载 Execl模板如下: 1. Java后端 功能:填充模板EXECL,并返回前端 controller层 package org.huan.controller;import org.huan.dto.ExcelData; import org.huan.util.ExcelT…...
ChatGPT本地部署,学习记录
一、GPT4ALL模型 官网地址: Github:https://github.com/nomic-ai/gpt4all GPT4ALL项目部署简易,但是在运行体验上一般,并且是只调用CPU来进行运算。 看官方文档介绍在嵌入式上有比较大的优势,但是目前个人对嵌入式…...
Find My游戏手柄|苹果Find My技术与手柄结合,智能防丢,全球定位
游戏手柄是一种常见电子游戏机的部件,通过操纵其按钮等,实现对游戏虚拟角色的控制。随着游戏设备硬件的升级换代,现代游戏手柄又增加了:类比摇杆(方向及视角),扳机键以及HOME菜单键等。现在的游…...
2024美赛数学建模思路 - 复盘:光照强度计算的优化模型
文章目录 0 赛题思路1 问题要求2 假设约定3 符号约定4 建立模型5 模型求解6 实现代码 建模资料 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 1 问题要求 现在已知一个教室长为15米,宽为12米&…...
【Deep Dive: AI Webinar】开放 ChatGPT - 人工智能开放性运作的案例研究
【深入探讨人工智能】网络研讨系列总共有 17 个视频。我们按照视频内容,大致上分成了 3 个大类: 1. 人工智能的开放、风险与挑战(4 篇) 2. 人工智能的治理(总共 12 篇),其中分成了几个子类&…...
Devops相关问题及答案(2024)
1、DevOps 的理念是什么? DevOps是一种组织文化、流程和工具的集合,旨在提高软件交付的速度和质量,通过自动化和持续改进的方法来促进开发(Dev)和运维(Ops)的协作。 DevOps的核心理念包括&…...
掌握Python设计模式,SQL Alchemy打破ORM与模型类的束缚
大家好,反转软件组件之间的依赖关系之所以重要,是因为它有助于降低耦合度和提高模块化程度,进而可以提高软件的可维护性、可扩展性和可测试性。 当组件之间紧密耦合时,对一个组件的更改可能会对其他组件产生意想不到的影响&#…...
性能分析与调优: Linux 磁盘I/O 观测工具
目录 一、实验 1.环境 2.iostat 3.sar 4.pidstat 5.perf 6. biolatency 7. biosnoop 8.iotop、biotop 9.blktrace 10.bpftrace 11.smartctl 二、问题 1.如何查看PSI数据 2.iotop如何安装 3.smartctl如何使用 一、实验 1.环境 (1)主机 …...
Could not erase files or folders:
IDEA删除 git 的 localChanges 内的文件时,提示Could not erase files or folders:。 确认下这个文件是否被打开,忘记关闭了;关闭后可以被删除。(文件被打开的情况下,用操作系统自带的删除,也无法删除成功…...
微信小程序之bind和catch
这两个呢,都是绑定事件用的,具体使用有些小区别。 官方文档: 事件冒泡处理不同 bind:绑定的事件会向上冒泡,即触发当前组件的事件后,还会继续触发父组件的相同事件。例如,有一个子视图绑定了b…...
树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频
使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...
Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)
文章目录 1.什么是Redis?2.为什么要使用redis作为mysql的缓存?3.什么是缓存雪崩、缓存穿透、缓存击穿?3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...
测试markdown--肇兴
day1: 1、去程:7:04 --11:32高铁 高铁右转上售票大厅2楼,穿过候车厅下一楼,上大巴车 ¥10/人 **2、到达:**12点多到达寨子,买门票,美团/抖音:¥78人 3、中饭&a…...
c#开发AI模型对话
AI模型 前面已经介绍了一般AI模型本地部署,直接调用现成的模型数据。这里主要讲述讲接口集成到我们自己的程序中使用方式。 微软提供了ML.NET来开发和使用AI模型,但是目前国内可能使用不多,至少实践例子很少看见。开发训练模型就不介绍了&am…...
学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2
每日一言 今天的每一份坚持,都是在为未来积攒底气。 案例:OLED显示一个A 这边观察到一个点,怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 : 如果代码里信号切换太快(比如 SDA 刚变,SCL 立刻变&#…...
企业如何增强终端安全?
在数字化转型加速的今天,企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机,到工厂里的物联网设备、智能传感器,这些终端构成了企业与外部世界连接的 “神经末梢”。然而,随着远程办公的常态化和设备接入的爆炸式…...
Typeerror: cannot read properties of undefined (reading ‘XXX‘)
最近需要在离线机器上运行软件,所以得把软件用docker打包起来,大部分功能都没问题,出了一个奇怪的事情。同样的代码,在本机上用vscode可以运行起来,但是打包之后在docker里出现了问题。使用的是dialog组件,…...
SiFli 52把Imagie图片,Font字体资源放在指定位置,编译成指定img.bin和font.bin的问题
分区配置 (ptab.json) img 属性介绍: img 属性指定分区存放的 image 名称,指定的 image 名称必须是当前工程生成的 binary 。 如果 binary 有多个文件,则以 proj_name:binary_name 格式指定文件名, proj_name 为工程 名&…...
数据结构:递归的种类(Types of Recursion)
目录 尾递归(Tail Recursion) 什么是 Loop(循环)? 复杂度分析 头递归(Head Recursion) 树形递归(Tree Recursion) 线性递归(Linear Recursion)…...
