【redis】springboot 用redis stream实现MQ消息队列 考虑异常ack重试场景
redis stream是redis5引入的特性,一定程度上借鉴了kafka等MQ的设计,部署的redis版本必须 >= 5
本文主要讲的是思路,结合简单的源码分析(放心,无需深入大量源码);讲述在redis stream文档缺乏,网上资料欠缺,gpt回答不上来的情况下,博主是如何用两三天的时间 从没接触过redis stream 到分析完成了redis stream mq功能 。博主始终认为 有明确的思路 才能知道什么代码是正确的 能复制拿来用,什么代码只是单纯跑起来demo的 绝对达不到生产级别。
本文源自csdn博主:孟秋与你 ,博主虽才疏学浅 却也是在资料极少的情况下 ,辛苦研究源码、整理思路 撰写的本文,转载请声明出处。
文章目录
- redisTemplate API的熟悉
- 配置
- redis mq config
- 监听器:
- 定时器
- 优化方向
(本文基于springboot3.3 jdk17 redis6环境,
理论上springboot2 redis5也是通用教程 可能会有细微的api差异 稍微分析一下源码方法都能处理)
redisTemplate API的熟悉
我们在操作redis的时候 通常是使用spring-data-redis提供的redisTemplate或者jedis 本文以redisTemplate为例。
(实际业务场景可能需要考虑用jedis替换 因为mq通常在数据量、并发量都大的场景;redisTemplate的优势在于和springboot的完美集成,且不需要考虑通过连接池来管理线程安全问题)
用过redisTemplate的同学应该都会自己封装一下工具类,因为redisTemplate封装的不够好,不管怎么样 我们都需要先看看这个类
redisTemplate.opsForHash(),redisTemplate.opsForValue()
各位应该很熟悉了, stream是一种新引入的格式,那么我们直接在RedisTemplate类里面搜stream就好了,正常都会有对应API
(没对应API那就是spring版本太老了 spring那个老版本出来的时候 redis还没出到5 )
搜到了opsForStream()方法
继续查看方法 如下图: 
这里说明一下,redis的streamKey就类似mq的topic, group是消费者组,cousumer是消费者,acknowledge即ack 应答机制 告诉mq已经成功消费了,claim是强制将消息转至其它消费者 通常用于消费失败/多次消费失败的场景,pending存放的是未ack的消息 就比如消费某个消息时 出现了异常 没能执行到ack 这些消息就会放在pending list 确保消息不丢失。
通过api,加上我们掌握的mq基本知识,大概就能理解是怎么一回事了。demo搭建不难,但是代码要上生产,我们就必须考虑消息消费失败了怎么办 该如何重试,也就是说重点的api在acknowledge和pending上面。
一个简单的封装
@Componentpublic class RedisStreamUtil {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 创建消费组** @param key 键名称* @param group 组名称* @return {@link String}*/public String createGroup(String key, String group) {return redisTemplate.opsForStream().createGroup(key, group);}/*** 获取消费者信息** @param key 键名称* @param group 组名称* @return {@link StreamInfo.XInfoConsumers}*/public StreamInfo.XInfoConsumers queryConsumers(String key, String group) {return redisTemplate.opsForStream().consumers(key, group);}/*** 查询组信息** @param key 键名称* @return*/public StreamInfo.XInfoGroups queryGroups(String key) {return redisTemplate.opsForStream().groups(key);}/*** 添加Map消息* @param key* @param value*/public String addMap(String key, Map<String, Object> value) {return redisTemplate.opsForStream().add(key, value).getValue();}/*** 读取消息* @param key*/public List<MapRecord<String, Object, Object>> read(String key) {return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));}/*** 确认消费* @param key* @param group* @param recordIds*/public Long ack(String key, String group, String... recordIds) {return redisTemplate.opsForStream().acknowledge(key, group, recordIds);}/*** 删除消息* 当一个节点的所有消息都被删除,那么该节点会自动销毁* @param key* @param recordIds*/public Long del(String key, String... recordIds) {return redisTemplate.opsForStream().delete(key, recordIds);}/*** 判断是否存在key* @param key*/public boolean hasKey(String key) {Boolean flag= redisTemplate.hasKey(key);return flag != null && flag;}}
注意:会有循环依赖的问题,如果没有那就是springboot版本太低,低版本默认是开启允许循环依赖的,高版本默认不允许(2.7已经不允许了 具体版本不记得了)
解决方法1: 在yml配置里面允许循环依赖
server:port: 8586spring:application:name: springboot3-demodata:redis:port: 6579host: 192.168.1.1password: xxxxxxxdatabase: 1lettuce:pool:max-wait: 5000msmax-active: 1000datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/test?characterEncoding=utf8&serverTimezone=UTC&rewriteBatchedStatements=truetype: com.alibaba.druid.pool.DruidDataSourceusername: rootpassword: root
# 允许循环依赖main:allow-circular-references: true
解决方法2:该工具类不交给spring托管 代码如下图所示
在spring bean初始化的时候 把redisTemplate bean赋值到工具类即可,工具类方法变成静态方法

配置
redis mq config
以下代码展示了如何配置多个生产者,也是这个代码最难写, 尤其是Subscription的创建 不能用spring官方文档里面提供的demo!
package com.qiuhuanhen.springboot3demo.redis.config;import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qiuhuanhen.springboot3demo.redis.RedisStreamUtil;
import com.qiuhuanhen.springboot3demo.redis.consumer.RedisConsumer;
import com.qiuhuanhen.springboot3demo.redis.consumer.listener.RedisConsumersListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisServerCommands;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;import javax.annotation.Resource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
@Slf4j
public class RedisConfig{@Autowiredprivate RedisStreamUtil redisStreamUtil;@Autowiredprivate ThreadPoolExecutor threadPoolExecutor;@Autowiredprivate Map<String, RedisConsumer> redisConsumer;/*** redis序列化** @param redisConnectionFactory* @return {@code RedisTemplate<String, Object>}*/@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(redisConnectionFactory);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.activateDefaultTyping(om.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(om,Object.class);StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();template.setKeySerializer(stringRedisSerializer);template.setHashKeySerializer(stringRedisSerializer);template.setValueSerializer(jackson2JsonRedisSerializer);template.setHashValueSerializer(jackson2JsonRedisSerializer);template.afterPropertiesSet();return template;}@Beanpublic List<Subscription> subscriptions(RedisConnectionFactory factory) {List<Subscription> subscriptions = new ArrayList<>();subscriptions.add( createSubscription(factory, "orderStream", "orderGroup", "orderConsumer"));subscriptions.add( createSubscription(factory, "productStream", "productGroup", "productConsumer"));return subscriptions;}/*** @param factory* @param streamName 类似 topic* @param groupName 消费组是 Redis Streams 中的一个重要特性,它允许多个消费者协作消费同一个流中的消息。每个消费组可以有多个消费者。* @param consumerName 这是消费组中的具体消费者名称。每个消费者会从消费组中领取消息进行处理。* @return*/private Subscription createSubscription(RedisConnectionFactory factory, String streamName, String groupName, String consumerName) {initStream(streamName, groupName);StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()// 每次从Redis Stream中读取消息的最大条数 (32为rocketmq的pullBatchSize默认数量).batchSize(32).executor(threadPoolExecutor)// 轮询拉取消息的时间 (如果流中没有消息,它会等待这么久的时间,然后再次检查。).pollTimeout(Duration.ofSeconds(1)).errorHandler(throwable -> {log.error("[redis MQ handler exception]", throwable);throwable.printStackTrace();}).build();var listenerContainer = StreamMessageListenerContainer.create(factory, options);// 手动ask消息
// Subscription subscription = listenerContainer.receive(Consumer.from(groupName, consumerName),
// // 创建一个流的偏移量实例。 含义: 指定从哪个偏移量开始读取消息。ReadOffset.lastConsumed()表示从上次消费的位置开始。
// StreamOffset.create(streamName, ReadOffset.lastConsumed()), redisConsumersListener);// 自动ask消息
// Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(groupName, consumerName),
// StreamOffset.create(streamName, ReadOffset.lastConsumed()), redisConsumersListener);// 手动创建 核心在于 cancelOnError(t -> false) 出现异常不退出StreamMessageListenerContainer.ConsumerStreamReadRequest<String> build = StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(streamName, ReadOffset.lastConsumed())).consumer(Consumer.from(groupName, consumerName)).autoAcknowledge(false)// 重要!.cancelOnError(t -> false).build();Subscription subscription = listenerContainer.register(build, new RedisConsumersListener(redisStreamUtil));listenerContainer.start();return subscription;}/*** 初始化流 保证stream流程是正常的** @param key* @param group*/private void initStream(String key, String group) {boolean hasKey = redisStreamUtil.hasKey(key);if (!hasKey) {Map<String, Object> map = new HashMap<>(1);map.put("author", "mengQiu");//创建主题String result = redisStreamUtil.addMap(key, map);//创建消费组redisStreamUtil.createGroup(key, group);//将初始化的值删除掉redisStreamUtil.del(key, result);log.info("stream:{}-group:{} initialize success", key, group);}}/*** 校验 Redis 版本号,是否满足最低的版本号要求 可自行使用*/private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {// 获得 Redis 版本Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);String version = MapUtil.getStr(info, "redis_version");// 校验最低版本必须大于等于 5.0.0int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));if (majorVersion < 5) {throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!", version));}}
}
我们简单阐述一下上面代码中的initStream方法:
private void initStream(String key, String group) {boolean hasKey = redisStreamUtil.hasKey(key);if (!hasKey) {Map<String, Object> map = new HashMap<>(1);// 先创建一个keymap.put("author", "mengQiu");//创建主题String result = redisStreamUtil.addMap(key, map);//创建消费组redisStreamUtil.createGroup(key, group);//再将初始化的值删除掉redisStreamUtil.del(key, result);log.info("stream:{}-group:{} initialize success", key, group);}}
先创建了一对K-V, 接着创建了一个消费组,再把K-V删除,剩下的就是消费组了。因为我们在createSubscription的时候声明了消费组,redis stream mq机制如此 如果redis里面没有消费组会直接报错消费组不存在 而不会自动创建 (与rocketMq类似)
那么有同学可能会问 直接createGroup不行吗?第一次创建当然是没问题的,但是后面项目再启动时 就会报错group已存在
聪明的你可能会有疑惑,那先查询组是否存在 再创建不行吗?
我们来看看redisTemplate的api:
redisTemplate.opsForStream().groups(key)
这个是查询消费组信息的api, 如果消费组不存在,会直接报错该消费组不存在。
所以initSream方法是一个小技巧,有点类似于卡bug。
当然 ,如果硬要只使用createGroup方法也不是不可以,加个try catch就好了,但这就相当于除了第一次初始化之外,之后每次启动项目 其实都会发生一次异常。
监听器:
(核心是实现StreamListener接口)
@Slf4j
public class RedisConsumersListener implements StreamListener<String, MapRecord<String, String, String>> {private RedisStreamUtil redisStreamUtil;public RedisConsumersListener(RedisStreamUtil redisStreamUtil) {this.redisStreamUtil = redisStreamUtil;}/*** 监听器** @param message*/@Overridepublic void onMessage(MapRecord<String, String, String> message) {// stream的key值String streamName = message.getStream();//消息IDRecordId recordId = message.getId();//消息内容Map<String, String> msg = message.getValue();// do something 处理 (这里一般是通过设计模式获取实现类方法 统一处理)//逻辑处理完成后,ack消息,删除消息,group为消费组名称StreamInfo.XInfoGroups xInfoGroups = redisStreamUtil.queryGroups(streamName);xInfoGroups.forEach(xInfoGroup -> redisStreamUtil.ack(streamName, xInfoGroup.groupName(), recordId.getValue()));redisStreamUtil.del(streamName, recordId.getValue());}log.info("【streamName】= " + streamName + ",【recordId】= " + recordId + ",【msg】=" + msg);}
}
感兴趣可以看博主踩到的坑, 看完思路才能自行判断 代码是否能直接复制使用 (个人感觉这才是分析技术最精彩的地方 有正确的思路才能在使用新技术时披荆斩棘); 不感兴趣可以直接跳到下一目录
===== ====== ====== 踩坑start ===== ==== ===== =====
一开始使用的是receive方法 (被注释的部分)
// 手动ask消息
// Subscription subscription = listenerContainer.receive(Consumer.from(groupName, consumerName),
// // 创建一个流的偏移量实例。 含义: 指定从哪个偏移量开始读取消息。ReadOffset.lastConsumed()表示从上次消费的位置开始。
// StreamOffset.create(streamName, ReadOffset.lastConsumed()), redisConsumersListener);
这也是网上使用最多的方法(因为spring给的文档demo就是这么创建的),但是它非常坑!

通过方法名我们可以判断出 receiveAutoAck是会自动ack的,不出异常还好,那如果出现异常呢 如何ack? 所以我们肯定是要手动控制的。

我们可以看看源码 它们的差异:

是的,就是一个是否自动ack的差别。
既然引入了消息队列,那说明数据量是比较大的,所以肯定是需要考虑异常情况下 消息不能丢失的,于是博主在消费时,故意编写了异常模拟不触发ack的场景. 结果发现 一旦消费出现异常 没有ack时,pending list不再新增数据,在项目重启后数据又增加了,但是再次消息异常时 pending list又阻塞了,这种现象非常奇怪! 难道一个消息没ack redis stream就阻塞吗?这显然不符合设计。 反复思考后,看起来像是出现异常后就停止了轮询,这个mq就像极了是一次性的。
但是和轮询相关的 也就一个pollTimeout参数,它能掀起多大的火花呢?
于是继续看代码 配置redis mq时,都有哪些api. 使用receive方法后 返回的是一个Subscription ,Subscription类有isActive()方法 ,于是在定时器中打印subsciption.isActive() 发现它竟然为false
于是我们追踪这个方法:

追踪到了StreamPollTask类

如果是task类 那么应该会有run方法 ,我们直接在里面搜run()

run方法里面主要就这两个方法
this.pollState.running();
this.doLoop();
第一个running方法 一眼看到头,没什么东西 ;我们看doLoop() 这个方法看起来是循环执行,如果任务中断了 说明是loop出问题了

里面有行代码:
if (this.cancelSubscriptionOnError.test(ex)) {this.cancel();}
也就是说在cancelSubscriptionOnError.test为true的时候 会取消执行

还记得isActive()方法吗 它正是去判断该状态的.
通过构造方法 可以看出 该参数是StreamMessageListenerContainer.StreamReadRequest streamRequest 传进来的

StreamMessageListenerContainer.StreamReadRequest在我们查看listenerContainer.receive源码时 有过一面之缘:

我们再看看StreamReadRequest.builder出来的StreamReadRequestBuilder类:

至此,分析完成了闭环,因为receive方法创建出来 默认是遇到异常就取消执行 这明显不符合实际使用,这个设计个人感觉非常欠佳。
这便是为什么使用以下代码来创建的原因
StreamMessageListenerContainer.ConsumerStreamReadRequest<String> build = StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(streamName, ReadOffset.lastConsumed())).consumer(Consumer.from(groupName, consumerName)).autoAcknowledge(false)// 重要!.cancelOnError(t -> false).build();
===== ====== ====== 踩坑end ===== ==== ===== =====
定时器
代码比较乱 注释代码比较多的原因 不是因为瞎写,而是那些api 在实际业务中可能会使用到,所以特地写在下面了
// 定期处理 pending list 中的消息@Scheduled(cron = "0/20 * * * * ?")public void processPendingMessages() {String streamKey = "orderStream"; // Redis Stream 的键String groupName = "orderGroup"; // 消费者组的名称String consumerName = "orderConsumer"; // 当前消费者的名称for (Subscription each : subscription) {System.out.println(each.isActive());}StreamOperations<String, String, String> streamOps = redisTemplate.opsForStream();// 获取 pending list 中未确认的消息概要PendingMessagesSummary pendingSummary = streamOps.pending(streamKey, groupName);// 所有pending消息的数量long totalPendingMessages = pendingSummary.getTotalPendingMessages();if (pendingSummary.getTotalPendingMessages() == 0L) {return;}// 消费组名称String groupName1 = pendingSummary.getGroupName();// pending队列中的最小IDString minMessageId = pendingSummary.minMessageId();// pending队列中的最大IDString maxMessageId = pendingSummary.maxMessageId();if (pendingSummary.getTotalPendingMessages() > 0) {// 读取消费者pending队列的前10条记录,从ID=0的记录开始,一直到ID最大值
// PendingMessages pendingMessages = streamOps.pending(streamKey, Consumer.from(groupName, consumerName), Range.closed("0", "+"), 10);// 获取 pending list 中具体的消息PendingMessages pendingMessages = streamOps.pending(streamKey, groupName, Range.unbounded(), 10000);int size = pendingMessages.size();// 获取当前批次的消息PendingMessage currentBatchMin = pendingMessages.get(0);PendingMessage currentBatchMax = pendingMessages.get(size-1);pendingMessages.forEach(pendingMessage ->{// 消息被获取的次数 可以根据次数做不同业务 超过一定次数未消费 考虑是否要ack并dellong deliveryCount = pendingMessage.getTotalDeliveryCount();// 读取每个未确认的消息
// List<MapRecord<String,String,String>> messages = streamOps.read(
// StreamReadOptions.empty(),
// StreamOffset.create(streamKey,ReadOffset.lastConsumed())
StreamOffset.create(streamKey,ReadOffset.from("0"))
// );List<MapRecord<String, String, String>> messages = streamOps.range(streamKey, Range.closed(currentBatchMin.getId().toString(), currentBatchMax.getId().toString()), Limit.limit().count(10000));for (MapRecord<String, String, String> message : messages) {try {// 处理消息processMessage(message);// 成功处理后确认消息streamOps.acknowledge(streamKey, groupName, message.getId());streamOps.delete(streamKey, message.getId());} catch (Exception e) {// 处理异常情况e.printStackTrace();}}});}}
至于如何触发就比较简单了,往redis添加一个streamKey即可
@GetMapping("/stream")public String testStream() {String mystream = "";for (int i = 0; i < 10; i++) {Oper oper = new Oper();oper.setTestId(11111111L);oper.setTestDesc("订单消息队列");oper.setVersion(i);oper.setTestXxx(LocalDateTime.now().toString());Map<String, Object> map = new HashMap<>();map.put("oper", oper);try {Thread.sleep(10);mystream = redisStreamUtil.addMap("orderStream", map);} catch (InterruptedException e) {throw new RuntimeException(e);}}return String.valueOf(mystream);}
优化方向
-
建立一个消费者抽象类,定义消费方法
-
建议一个降级处理抽象类,定义补偿方法(即消费失败时的处理)
-
定义spring的properties类 把生产者消费者字段写到里面
-
redis需要部署集群,可在博主的主页搜索哨兵,有哨兵架构教程。
-
实际业务中,消费消息很可能是存入数据库,在入库完成之后 redis ack完成之前,如果这一瞬间突然宕机了,而数据量又非常大,可能会导致消费重复的情况,因为没有完成ack 下次还是会把该数据从pending list里面取出来。
解决方案1 :考虑是加redisson锁
解决方案2:数据库存入消息id字段并建立唯一索引
(唯一索引的魅力体现出来了)
至此,一份生产级别的redis stream mq架构成立。
相关文章:
【redis】springboot 用redis stream实现MQ消息队列 考虑异常ack重试场景
redis stream是redis5引入的特性,一定程度上借鉴了kafka等MQ的设计,部署的redis版本必须 > 5 本文主要讲的是思路,结合简单的源码分析(放心,无需深入大量源码);讲述在redis stream文档缺乏&a…...
初识IDEA
一、IDEA简介 IDEA 全称 IntelliJ IDEA,是 JAVA编程语言开发的集成环境。IntelliJ 在业界被公认为最好的 java开发⼯具 之⼀,尤其在智能代码助⼿、代码⾃动提示、重构、J2EE⽀持、Ant、JUnit、CVS整合、代码审 查⽅⾯。 JetBrains官⽹ : JetBrains: Esse…...
zigbee笔记:十、ZStack(2.3.0-1.4.0)的OSAL使用分析
zigbee笔记:九中,我们已经学会了利用模板,定制自己的个性开发工程,本文为协议栈(ZStack-CC2530-2.3.0-1.4.0)代码使用分析笔记,来进一步掌握协议栈的使用。 一、协议栈使用知识点 1、协调器、路…...
SpringBoot响应式编程(1)Reactor核心
一、概述 1.1介绍 Reactor 是一个用于JVM的完全非阻塞的响应式编程框架,Webflux 底层使用的也是该框架,其通过流的方式实现了异步相应,具备高效的需求管理(即对 “背压(backpressure)”的控制)…...
Java后端处理前端字符串与 JSON 数据:安全拼接与转义技巧
在现代 Web 开发中,前后端数据交互是家常便饭。我们经常需要处理前端传递的字符串和 JSON 数据,并在后端进行加工处理后发送到其他服务。本文将以 Spring Boot 为例,探讨如何安全地拼接字符串和 JSON 数据,并介绍如何避免 JSON 特…...
一文搞懂bfs,dfs和高级图算法
你以为BFS(广度优先搜索)和DFS(深度优先搜索)这两种基础算法,简单到小学数学就能搞定?但真的是这样吗?很多人都这么认为,但真的对吗?今天,我们不只是走马观花…...
【Rust光年纪】Rust异步编程利器:异步DNS、高性能Web服务器一网打尽
构建高效网络应用必备:解读Rust异步编程神器 前言 Rust 是一种快速流行的系统编程语言,它以其内存安全和并发性能而闻名。在 Rust 生态系统中,有许多优秀的库和框架可以帮助开发者构建高性能、可靠的应用程序。本文将介绍几个在 Rust 中备受…...
04学生管理系统(栈)
文章目录 预处理菜单结构体主函数函数声明栈操作功能实现 预处理 #define _CRT_SECURE_NO_WARNINGS #include<stdio.h> #include<stdlib.h> #include<windows.h> #include<conio.h>#define OVERFLOW -2 #define FALSE 0 #define TRUE 1 #define OK 1 …...
我们如何在centos上部署批量管理工具ansible
1)我们先准备环境、设备 #我们准备一台服务机 (192.168.61.140) #然后准备几天客户机(192.168.61.141 192.168.61.142)这里我们准备两台2)然后我们在客服务机里面添加域名 vi /etc/hosts #添加如下内容 192.…...
如何评估前端代码审查培训计划的有效性?
评估前端代码审查培训计划的有效性可以通过以下方法: 培训前后测试: 在培训前后对学员进行测试,比较结果以评估知识增长。 学员反馈: 通过问卷调查、访谈或开放式反馈收集学员对培训内容、方式和效果的看法。 参与度:…...
使用nvm切换Node.js版本
一、安装nvm nvm(Node Version Manager)是一个用于管理Node.js版本的工具,它允许你在同一台机器上安装和切换多个Node.js版本。 1.安装nvm https://github.com/coreybutler/nvm-windows 访问以上链接到github去下载 点击releases 下载下图…...
x264 编码器 PSNR算法源码分析
PSNR PSNR(Peak Signal-to-Noise Ratio,峰值信噪比)是一种常用的图像质量评价指标,用于衡量图像或视频的清晰度和质量。PSNR是基于信号的最大可能功率与影响信号的噪声功率之间的比率。在图像处理领域,PSNR通常用来评估图像压缩或图像增强算法的效果。 PSNR的计算公式是…...
开源web版3D展示工具Online3DViewer
Online3DViewer是一个免费且开源的Web解决方案,它允许用户在浏览器中直接预览和探索3D模型。 以下是关于Online3DViewer的详细介绍: 一、基本概述 定义:Online3DViewer是一个在线3D模型查看器,支持多种3D文件格式,用…...
白骑士的Matlab教学实战项目篇 4.2 信号与图像处理项目
系列目录 上一篇:白骑士的Matlab教学实战项目篇 4.1 数据分析与可视化 信号处理和图像处理是 MATLAB 的重要应用领域,广泛应用于医学、工程、科学研究等领域。以下内容将介绍信号滤波与频域分析、图像增强与分割的基本概念和方法,并通过一个…...
复现、并改进open-mmlab的mmpose详细细节
复现open-mmlab的mmpose详细细节 1.配置环境2.数据处理3.训练4.改进mmpose4.1 快速调试技巧4.2 快速定位4.3 改进backbone4.3.1 使用说明4.3.2 改进案例4.3.2.1 复现mmpose原配置文件4.3.2.2 复现开源项目4.3.2.3 修改配置文件4.3.2.4 修改新模型 4.4 添加auxiliary_head4.4.1 …...
编写兼容Python2.x与3.x代码
编写兼容Python2.x与3.x代码 当我们正处于Python2.x到Python3.x的过渡期时,你可能想过是否可以在不修改任何代码的前提下能同时运行在Python2和3中。这看起来还真是一个合理的诉求,但如何开始呢?哪些Python2代码在3.x解释器执行时容易出状况…...
比特币8.12学习问题
疑问:什么是过滤,什么是offset 没有投钱的情况下,怎么用api 公式:单币分配金额 总资金 / 2/ offset/选币数量,其中2 表示多空 买入滑点(Slippage)是指在执行交易订单时,实际成交…...
解析 Vue 中的app.version、 app.provide 与 app.runWithContext :原理、应用与实例剖析
目录 app.provide app.runWithContext app.version 非 VIP 用户能够通过积分下载博文资源 app.provide 在 Vue 3.0 中,app.provide充当着在应用层级提供全局共享数据或者服务的关键角色。 app.provide(key, value) 这一方法接收两个关键参数,其中 …...
Ubuntu server 命令行跑selenium
背景 自动化测试都是在本机win上使用selenium 跑自动化脚本,但是服务器都是命令行的没有web界面 依赖包部署 apt-get install zlib1g-dev zlib1g## 安装谷歌浏览器 ## 跳到底部,选择其他平台 https://www.google.com/chrome/## ubuntu # dpkg -i google-chrome-stable_…...
刚刚,模糊测试平台SFuzz受到行业认可
近日,中国网络安全产业联盟(CCIA)正式发布了“2024年网络安全优秀创新成果大赛-安全严选专题赛”评选结果,开源网安模糊测试平台SFuzz凭借重大创新能力,得到组委会认可,获本次大赛创新产品优胜奖。 2024年网…...
Infineon BGT60TR13C毫米波雷达Arduino底层驱动详解
1. 项目概述Infineon XENSIV™ BGT60TR13C 是一款集成化60 GHz毫米波雷达传感器芯片,专为低功耗、高精度运动检测与距离测量应用而设计。该器件采用单片集成方案,将60 GHz VCO、发射/接收前端、三通道接收链路(含LNA、Mixer、IF VGAÿ…...
RC4算法逆向实战:从特征识别到魔改对抗
1. RC4算法基础与逆向特征识别 RC4算法作为经典的流加密算法,在CTF竞赛和恶意软件分析中频繁出现。我第一次逆向分析RC4加密的样本时,花了整整三天才确认算法类型——因为当时的我还不熟悉它的特征指纹。现在回头看,识别标准RC4其实有明确的规…...
苹果内购Java后端避坑指南:收据验证、状态码处理和防重复消费实战
苹果内购Java后端深度防御指南:从收据验收到分布式幂等设计 当你的应用内购收入突然出现异常波动,或是用户投诉被重复扣款时,背后往往隐藏着苹果内购接口的"暗礁"。作为经历过百万级内购交易的老兵,我想分享几个真实生产…...
解决Python SocketIO客户端连接问题
引言 在构建一个IT自动化系统时,管理多个电脑的需求日益增强。为了实现这一目标,我采用了以下技术栈: 前端:React 后端****中间层:NodeJS 终端代理:Python 然而,在使用Python实现SocketIO客户端时,我遇到了一个问题:Python客户端无法正常打印连接成功的消息。本文将详…...
清明节海报设计指南:4个要点打造高级感视觉呈现
每到清明临近,总有人为海报设计发愁。想做一张既体面又有格调的清明节海报,打开设计软件却不知从何下手,勉强拼凑出来的效果又总觉得差点意思。要么太过花哨显得不够庄重,要么过于简陋显得敷衍。其实高级感并不难,关键…...
OpenClaw 局域网访问配置文档
OpenClaw 局域网访问配置文档 概述 本文档详细说明了如何配置 OpenClaw 以允许局域网内的其他设备访问,包括所有相关配置参数的作用和说明。 当前配置状态 网关服务信息 服务端口: 18789 绑定模式: lan (局域网访问) 认证方式: password (密码认证) 访问密码: xxxxxx 详细…...
2026届最火的降重复率工具实际效果
Ai论文网站排名(开题报告、文献综述、降aigc率、降重综合对比) TOP1. 千笔AI TOP2. aipasspaper TOP3. 清北论文 TOP4. 豆包 TOP5. kimi TOP6. deepseek 维普平台针对AIGC技术的引入,制定了严谨的检测规范,在当前学术场景里…...
2024IEEE 《基于二次规划的安全关键型多智能体系统的控制》四旋翼 无人机 MATLAB
2024IEEE 《基于二次规划的安全关键型多智能体系统的控制》四旋翼 无人机 MATLAB 代码复现(文献代码)协同控制 规划 无人机 研究了基于二次规划的安全关键型多智能体系统的控制问题。 每个被控智能体被建模为一个积分器和一个不确定非线性驱动系统的级联…...
BES-XGBoost多变量时间序列预测的‘秃鹰搜索优化算法‘与交叉验证抑制过拟合问题的Mat...
基于秃鹰搜索优化算法优化XGBoost(BES-XGBoost)的多变量时间序列预测 BES-XGBoost多变量时间序列 采用交叉验证抑制过拟合问题 优化参数为迭代次数、最大深度和学习率 matlab代码,注:暂无Matlab版本要求 -- 推荐 2016B 版本及以上 注:采用 XG…...
终极指南:如何用3分钟为Windows换上《蔚蓝档案》风格光标主题
终极指南:如何用3分钟为Windows换上《蔚蓝档案》风格光标主题 【免费下载链接】BlueArchive-Cursors Custom mouse cursor theme based on the school RPG Blue Archive. 项目地址: https://gitcode.com/gh_mirrors/bl/BlueArchive-Cursors 每天面对电脑工作…...
