当前位置: 首页 > news >正文

【redis】springboot 用redis stream实现MQ消息队列 考虑异常ack重试场景

redis stream是redis5引入的特性,一定程度上借鉴了kafka等MQ的设计,部署的redis版本必须 >= 5

本文主要讲的是思路,结合简单的源码分析(放心,无需深入大量源码);讲述在redis stream文档缺乏,网上资料欠缺,gpt回答不上来的情况下,博主是如何用两三天的时间 从没接触过redis stream 到分析完成了redis stream mq功能 。博主始终认为 有明确的思路 才能知道什么代码是正确的 能复制拿来用,什么代码只是单纯跑起来demo的 绝对达不到生产级别。
本文源自csdn博主:孟秋与你 ,博主虽才疏学浅 却也是在资料极少的情况下 ,辛苦研究源码、整理思路 撰写的本文,转载请声明出处。

文章目录

  • redisTemplate API的熟悉
  • 配置
    • redis mq config
    • 监听器:
    • 定时器
  • 优化方向

(本文基于springboot3.3 jdk17 redis6环境,
理论上springboot2 redis5也是通用教程 可能会有细微的api差异 稍微分析一下源码方法都能处理)

redisTemplate API的熟悉

我们在操作redis的时候 通常是使用spring-data-redis提供的redisTemplate或者jedis 本文以redisTemplate为例。
(实际业务场景可能需要考虑用jedis替换 因为mq通常在数据量、并发量都大的场景;redisTemplate的优势在于和springboot的完美集成,且不需要考虑通过连接池来管理线程安全问题)

用过redisTemplate的同学应该都会自己封装一下工具类,因为redisTemplate封装的不够好,不管怎么样 我们都需要先看看这个类

redisTemplate.opsForHash()redisTemplate.opsForValue()

各位应该很熟悉了, stream是一种新引入的格式,那么我们直接在RedisTemplate类里面搜stream就好了,正常都会有对应API
(没对应API那就是spring版本太老了 spring那个老版本出来的时候 redis还没出到5 )

搜到了opsForStream()方法在这里插入图片描述 继续查看方法 如下图: 在这里插入图片描述

这里说明一下,redis的streamKey就类似mq的topic, group是消费者组,cousumer是消费者,acknowledge即ack 应答机制 告诉mq已经成功消费了,claim是强制将消息转至其它消费者 通常用于消费失败/多次消费失败的场景,pending存放的是未ack的消息 就比如消费某个消息时 出现了异常 没能执行到ack 这些消息就会放在pending list 确保消息不丢失。

通过api,加上我们掌握的mq基本知识,大概就能理解是怎么一回事了。demo搭建不难,但是代码要上生产,我们就必须考虑消息消费失败了怎么办 该如何重试,也就是说重点的api在acknowledge和pending上面。

一个简单的封装

	@Componentpublic class RedisStreamUtil {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 创建消费组** @param key   键名称* @param group 组名称* @return {@link String}*/public String createGroup(String key, String group) {return redisTemplate.opsForStream().createGroup(key, group);}/*** 获取消费者信息** @param key   键名称* @param group 组名称* @return {@link StreamInfo.XInfoConsumers}*/public StreamInfo.XInfoConsumers queryConsumers(String key, String group) {return redisTemplate.opsForStream().consumers(key, group);}/*** 查询组信息** @param key 键名称* @return*/public StreamInfo.XInfoGroups queryGroups(String key) {return redisTemplate.opsForStream().groups(key);}/*** 添加Map消息* @param key* @param value*/public String addMap(String key, Map<String, Object> value) {return redisTemplate.opsForStream().add(key, value).getValue();}/*** 读取消息* @param key*/public List<MapRecord<String, Object, Object>> read(String key) {return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));}/*** 确认消费* @param key* @param group* @param recordIds*/public Long ack(String key, String group, String... recordIds) {return redisTemplate.opsForStream().acknowledge(key, group, recordIds);}/*** 删除消息* 当一个节点的所有消息都被删除,那么该节点会自动销毁* @param key* @param recordIds*/public Long del(String key, String... recordIds) {return redisTemplate.opsForStream().delete(key, recordIds);}/***  判断是否存在key* @param key*/public boolean hasKey(String key) {Boolean flag= redisTemplate.hasKey(key);return flag != null && flag;}}

注意:会有循环依赖的问题,如果没有那就是springboot版本太低,低版本默认是开启允许循环依赖的,高版本默认不允许(2.7已经不允许了 具体版本不记得了)

解决方法1: 在yml配置里面允许循环依赖

server:port: 8586spring:application:name: springboot3-demodata:redis:port: 6579host: 192.168.1.1password: xxxxxxxdatabase: 1lettuce:pool:max-wait: 5000msmax-active: 1000datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/test?characterEncoding=utf8&serverTimezone=UTC&rewriteBatchedStatements=truetype: com.alibaba.druid.pool.DruidDataSourceusername: rootpassword: root
# 允许循环依赖main:allow-circular-references: true

解决方法2:该工具类不交给spring托管 代码如下图所示
在spring bean初始化的时候 把redisTemplate bean赋值到工具类即可,工具类方法变成静态方法
在这里插入图片描述

配置

redis mq config

以下代码展示了如何配置多个生产者,也是这个代码最难写, 尤其是Subscription的创建 不能用spring官方文档里面提供的demo!


package com.qiuhuanhen.springboot3demo.redis.config;import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qiuhuanhen.springboot3demo.redis.RedisStreamUtil;
import com.qiuhuanhen.springboot3demo.redis.consumer.RedisConsumer;
import com.qiuhuanhen.springboot3demo.redis.consumer.listener.RedisConsumersListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisServerCommands;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;import javax.annotation.Resource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
@Slf4j
public class RedisConfig{@Autowiredprivate RedisStreamUtil redisStreamUtil;@Autowiredprivate ThreadPoolExecutor threadPoolExecutor;@Autowiredprivate Map<String, RedisConsumer> redisConsumer;/*** redis序列化** @param redisConnectionFactory* @return {@code RedisTemplate<String, Object>}*/@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(redisConnectionFactory);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.activateDefaultTyping(om.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(om,Object.class);StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();template.setKeySerializer(stringRedisSerializer);template.setHashKeySerializer(stringRedisSerializer);template.setValueSerializer(jackson2JsonRedisSerializer);template.setHashValueSerializer(jackson2JsonRedisSerializer);template.afterPropertiesSet();return template;}@Beanpublic List<Subscription> subscriptions(RedisConnectionFactory factory) {List<Subscription> subscriptions = new ArrayList<>();subscriptions.add( createSubscription(factory, "orderStream", "orderGroup", "orderConsumer"));subscriptions.add( createSubscription(factory, "productStream", "productGroup", "productConsumer"));return subscriptions;}/*** @param factory* @param streamName   类似 topic* @param groupName    消费组是 Redis Streams 中的一个重要特性,它允许多个消费者协作消费同一个流中的消息。每个消费组可以有多个消费者。* @param consumerName 这是消费组中的具体消费者名称。每个消费者会从消费组中领取消息进行处理。* @return*/private Subscription createSubscription(RedisConnectionFactory factory, String streamName, String groupName, String consumerName) {initStream(streamName, groupName);StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()// 每次从Redis Stream中读取消息的最大条数 (32为rocketmq的pullBatchSize默认数量).batchSize(32).executor(threadPoolExecutor)// 轮询拉取消息的时间 (如果流中没有消息,它会等待这么久的时间,然后再次检查。).pollTimeout(Duration.ofSeconds(1)).errorHandler(throwable -> {log.error("[redis MQ handler exception]", throwable);throwable.printStackTrace();}).build();var listenerContainer = StreamMessageListenerContainer.create(factory, options);// 手动ask消息
//        Subscription subscription = listenerContainer.receive(Consumer.from(groupName, consumerName),
//                //  创建一个流的偏移量实例。 含义: 指定从哪个偏移量开始读取消息。ReadOffset.lastConsumed()表示从上次消费的位置开始。
//                StreamOffset.create(streamName, ReadOffset.lastConsumed()), redisConsumersListener);// 自动ask消息
//            Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(groupName, consumerName),
//                    StreamOffset.create(streamName, ReadOffset.lastConsumed()), redisConsumersListener);// 手动创建 核心在于 cancelOnError(t -> false)  出现异常不退出StreamMessageListenerContainer.ConsumerStreamReadRequest<String> build = StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(streamName, ReadOffset.lastConsumed())).consumer(Consumer.from(groupName, consumerName)).autoAcknowledge(false)// 重要!.cancelOnError(t -> false).build();Subscription subscription = listenerContainer.register(build, new RedisConsumersListener(redisStreamUtil));listenerContainer.start();return subscription;}/*** 初始化流 保证stream流程是正常的** @param key* @param group*/private void initStream(String key, String group) {boolean hasKey = redisStreamUtil.hasKey(key);if (!hasKey) {Map<String, Object> map = new HashMap<>(1);map.put("author", "mengQiu");//创建主题String result = redisStreamUtil.addMap(key, map);//创建消费组redisStreamUtil.createGroup(key, group);//将初始化的值删除掉redisStreamUtil.del(key, result);log.info("stream:{}-group:{} initialize success", key, group);}}/*** 校验 Redis 版本号,是否满足最低的版本号要求 可自行使用*/private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {// 获得 Redis 版本Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);String version = MapUtil.getStr(info, "redis_version");// 校验最低版本必须大于等于 5.0.0int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));if (majorVersion < 5) {throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!", version));}}
}

我们简单阐述一下上面代码中的initStream方法:

    private void initStream(String key, String group) {boolean hasKey = redisStreamUtil.hasKey(key);if (!hasKey) {Map<String, Object> map = new HashMap<>(1);// 先创建一个keymap.put("author", "mengQiu");//创建主题String result = redisStreamUtil.addMap(key, map);//创建消费组redisStreamUtil.createGroup(key, group);//再将初始化的值删除掉redisStreamUtil.del(key, result);log.info("stream:{}-group:{} initialize success", key, group);}}

先创建了一对K-V, 接着创建了一个消费组,再把K-V删除,剩下的就是消费组了。因为我们在createSubscription的时候声明了消费组,redis stream mq机制如此 如果redis里面没有消费组会直接报错消费组不存在 而不会自动创建 (与rocketMq类似)

那么有同学可能会问 直接createGroup不行吗?第一次创建当然是没问题的,但是后面项目再启动时 就会报错group已存在

聪明的你可能会有疑惑,那先查询组是否存在 再创建不行吗?
我们来看看redisTemplate的api:
redisTemplate.opsForStream().groups(key)
这个是查询消费组信息的api, 如果消费组不存在,会直接报错该消费组不存在。

所以initSream方法是一个小技巧,有点类似于卡bug。

当然 ,如果硬要只使用createGroup方法也不是不可以,加个try catch就好了,但这就相当于除了第一次初始化之外,之后每次启动项目 其实都会发生一次异常。

监听器:

(核心是实现StreamListener接口)

@Slf4j
public class RedisConsumersListener implements StreamListener<String, MapRecord<String, String, String>> {private RedisStreamUtil redisStreamUtil;public RedisConsumersListener(RedisStreamUtil redisStreamUtil) {this.redisStreamUtil = redisStreamUtil;}/*** 监听器** @param message*/@Overridepublic void onMessage(MapRecord<String, String, String> message) {// stream的key值String streamName = message.getStream();//消息IDRecordId recordId = message.getId();//消息内容Map<String, String> msg = message.getValue();// do something 处理 (这里一般是通过设计模式获取实现类方法 统一处理)//逻辑处理完成后,ack消息,删除消息,group为消费组名称StreamInfo.XInfoGroups xInfoGroups = redisStreamUtil.queryGroups(streamName);xInfoGroups.forEach(xInfoGroup -> redisStreamUtil.ack(streamName, xInfoGroup.groupName(), recordId.getValue()));redisStreamUtil.del(streamName, recordId.getValue());}log.info("【streamName】= " + streamName + ",【recordId】= " + recordId + ",【msg】=" + msg);}
}

感兴趣可以看博主踩到的坑, 看完思路才能自行判断 代码是否能直接复制使用 (个人感觉这才是分析技术最精彩的地方 有正确的思路才能在使用新技术时披荆斩棘); 不感兴趣可以直接跳到下一目录

===== ====== ====== 踩坑start ===== ==== ===== =====
一开始使用的是receive方法 (被注释的部分)

        // 手动ask消息
//        Subscription subscription = listenerContainer.receive(Consumer.from(groupName, consumerName),
//                //  创建一个流的偏移量实例。 含义: 指定从哪个偏移量开始读取消息。ReadOffset.lastConsumed()表示从上次消费的位置开始。
//                StreamOffset.create(streamName, ReadOffset.lastConsumed()), redisConsumersListener);

这也是网上使用最多的方法(因为spring给的文档demo就是这么创建的),但是它非常坑!
在这里插入图片描述

通过方法名我们可以判断出 receiveAutoAck是会自动ack的,不出异常还好,那如果出现异常呢 如何ack? 所以我们肯定是要手动控制的。
在这里插入图片描述
我们可以看看源码 它们的差异:
在这里插入图片描述
是的,就是一个是否自动ack的差别。

既然引入了消息队列,那说明数据量是比较大的,所以肯定是需要考虑异常情况下 消息不能丢失的,于是博主在消费时,故意编写了异常模拟不触发ack的场景. 结果发现 一旦消费出现异常 没有ack时,pending list不再新增数据,在项目重启后数据又增加了,但是再次消息异常时 pending list又阻塞了,这种现象非常奇怪! 难道一个消息没ack redis stream就阻塞吗?这显然不符合设计。 反复思考后,看起来像是出现异常后就停止了轮询,这个mq就像极了是一次性的。
但是和轮询相关的 也就一个pollTimeout参数,它能掀起多大的火花呢?

于是继续看代码 配置redis mq时,都有哪些api. 使用receive方法后 返回的是一个Subscription ,Subscription类有isActive()方法 ,于是在定时器中打印subsciption.isActive() 发现它竟然为false

于是我们追踪这个方法:
在这里插入图片描述
追踪到了StreamPollTask类
在这里插入图片描述
如果是task类 那么应该会有run方法 ,我们直接在里面搜run()

在这里插入图片描述
run方法里面主要就这两个方法
this.pollState.running();
this.doLoop();
第一个running方法 一眼看到头,没什么东西 ;我们看doLoop() 这个方法看起来是循环执行,如果任务中断了 说明是loop出问题了
在这里插入图片描述
里面有行代码:

    if (this.cancelSubscriptionOnError.test(ex)) {this.cancel();}

也就是说在cancelSubscriptionOnError.test为true的时候 会取消执行
在这里插入图片描述

还记得isActive()方法吗 它正是去判断该状态的.

通过构造方法 可以看出 该参数是StreamMessageListenerContainer.StreamReadRequest streamRequest 传进来的
在这里插入图片描述

StreamMessageListenerContainer.StreamReadRequest在我们查看listenerContainer.receive源码时 有过一面之缘:
在这里插入图片描述

我们再看看StreamReadRequest.builder出来的StreamReadRequestBuilder类:
在这里插入图片描述
至此,分析完成了闭环,因为receive方法创建出来 默认是遇到异常就取消执行 这明显不符合实际使用,这个设计个人感觉非常欠佳。

这便是为什么使用以下代码来创建的原因

     StreamMessageListenerContainer.ConsumerStreamReadRequest<String> build = StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(streamName, ReadOffset.lastConsumed())).consumer(Consumer.from(groupName, consumerName)).autoAcknowledge(false)// 重要!.cancelOnError(t -> false).build();

===== ====== ====== 踩坑end ===== ==== ===== =====

定时器

代码比较乱 注释代码比较多的原因 不是因为瞎写,而是那些api 在实际业务中可能会使用到,所以特地写在下面了

// 定期处理 pending list 中的消息@Scheduled(cron = "0/20 * * * * ?")public void processPendingMessages() {String streamKey = "orderStream"; // Redis Stream 的键String groupName = "orderGroup";  // 消费者组的名称String consumerName = "orderConsumer"; // 当前消费者的名称for (Subscription each : subscription) {System.out.println(each.isActive());}StreamOperations<String, String, String> streamOps = redisTemplate.opsForStream();// 获取 pending list 中未确认的消息概要PendingMessagesSummary pendingSummary = streamOps.pending(streamKey, groupName);// 所有pending消息的数量long totalPendingMessages = pendingSummary.getTotalPendingMessages();if (pendingSummary.getTotalPendingMessages() == 0L) {return;}// 消费组名称String groupName1 = pendingSummary.getGroupName();// pending队列中的最小IDString minMessageId = pendingSummary.minMessageId();// pending队列中的最大IDString maxMessageId = pendingSummary.maxMessageId();if (pendingSummary.getTotalPendingMessages() > 0) {// 读取消费者pending队列的前10条记录,从ID=0的记录开始,一直到ID最大值
//            PendingMessages pendingMessages = streamOps.pending(streamKey, Consumer.from(groupName, consumerName), Range.closed("0", "+"), 10);// 获取 pending list 中具体的消息PendingMessages pendingMessages = streamOps.pending(streamKey, groupName, Range.unbounded(), 10000);int size = pendingMessages.size();// 获取当前批次的消息PendingMessage currentBatchMin = pendingMessages.get(0);PendingMessage currentBatchMax = pendingMessages.get(size-1);pendingMessages.forEach(pendingMessage ->{// 消息被获取的次数 可以根据次数做不同业务 超过一定次数未消费 考虑是否要ack并dellong deliveryCount = pendingMessage.getTotalDeliveryCount();// 读取每个未确认的消息
//                List<MapRecord<String,String,String>> messages = streamOps.read(
//                        StreamReadOptions.empty(),
//                        StreamOffset.create(streamKey,ReadOffset.lastConsumed())
                        StreamOffset.create(streamKey,ReadOffset.from("0"))
//                );List<MapRecord<String, String, String>> messages = streamOps.range(streamKey, Range.closed(currentBatchMin.getId().toString(), currentBatchMax.getId().toString()), Limit.limit().count(10000));for (MapRecord<String, String, String> message : messages) {try {// 处理消息processMessage(message);// 成功处理后确认消息streamOps.acknowledge(streamKey, groupName, message.getId());streamOps.delete(streamKey, message.getId());} catch (Exception e) {// 处理异常情况e.printStackTrace();}}});}}

至于如何触发就比较简单了,往redis添加一个streamKey即可

   @GetMapping("/stream")public String testStream() {String mystream = "";for (int i = 0; i < 10; i++) {Oper oper = new Oper();oper.setTestId(11111111L);oper.setTestDesc("订单消息队列");oper.setVersion(i);oper.setTestXxx(LocalDateTime.now().toString());Map<String, Object> map = new HashMap<>();map.put("oper", oper);try {Thread.sleep(10);mystream = redisStreamUtil.addMap("orderStream", map);} catch (InterruptedException e) {throw new RuntimeException(e);}}return String.valueOf(mystream);}

优化方向

  1. 建立一个消费者抽象类,定义消费方法

  2. 建议一个降级处理抽象类,定义补偿方法(即消费失败时的处理)

  3. 定义spring的properties类 把生产者消费者字段写到里面

  4. redis需要部署集群,可在博主的主页搜索哨兵,有哨兵架构教程。

  5. 实际业务中,消费消息很可能是存入数据库,在入库完成之后 redis ack完成之前,如果这一瞬间突然宕机了,而数据量又非常大,可能会导致消费重复的情况,因为没有完成ack 下次还是会把该数据从pending list里面取出来。

    解决方案1 :考虑是加redisson锁
    解决方案2:数据库存入消息id字段并建立唯一索引
    (唯一索引的魅力体现出来了)

至此,一份生产级别的redis stream mq架构成立。

相关文章:

【redis】springboot 用redis stream实现MQ消息队列 考虑异常ack重试场景

redis stream是redis5引入的特性&#xff0c;一定程度上借鉴了kafka等MQ的设计&#xff0c;部署的redis版本必须 > 5 本文主要讲的是思路&#xff0c;结合简单的源码分析&#xff08;放心&#xff0c;无需深入大量源码&#xff09;&#xff1b;讲述在redis stream文档缺乏&a…...

初识IDEA

一、IDEA简介 IDEA 全称 IntelliJ IDEA&#xff0c;是 JAVA编程语言开发的集成环境。IntelliJ 在业界被公认为最好的 java开发⼯具 之⼀&#xff0c;尤其在智能代码助⼿、代码⾃动提示、重构、J2EE⽀持、Ant、JUnit、CVS整合、代码审 查⽅⾯。 JetBrains官⽹ : JetBrains: Esse…...

zigbee笔记:十、ZStack(2.3.0-1.4.0)的OSAL使用分析

zigbee笔记&#xff1a;九中&#xff0c;我们已经学会了利用模板&#xff0c;定制自己的个性开发工程&#xff0c;本文为协议栈&#xff08;ZStack-CC2530-2.3.0-1.4.0&#xff09;代码使用分析笔记&#xff0c;来进一步掌握协议栈的使用。 一、协议栈使用知识点 1、协调器、路…...

SpringBoot响应式编程(1)Reactor核心

一、概述 1.1介绍 Reactor 是一个用于JVM的完全非阻塞的响应式编程框架&#xff0c;Webflux 底层使用的也是该框架&#xff0c;其通过流的方式实现了异步相应&#xff0c;具备高效的需求管理&#xff08;即对 “背压&#xff08;backpressure&#xff09;”的控制&#xff09…...

Java后端处理前端字符串与 JSON 数据:安全拼接与转义技巧

在现代 Web 开发中&#xff0c;前后端数据交互是家常便饭。我们经常需要处理前端传递的字符串和 JSON 数据&#xff0c;并在后端进行加工处理后发送到其他服务。本文将以 Spring Boot 为例&#xff0c;探讨如何安全地拼接字符串和 JSON 数据&#xff0c;并介绍如何避免 JSON 特…...

一文搞懂bfs,dfs和高级图算法

你以为BFS&#xff08;广度优先搜索&#xff09;和DFS&#xff08;深度优先搜索&#xff09;这两种基础算法&#xff0c;简单到小学数学就能搞定&#xff1f;但真的是这样吗&#xff1f;很多人都这么认为&#xff0c;但真的对吗&#xff1f;今天&#xff0c;我们不只是走马观花…...

【Rust光年纪】Rust异步编程利器:异步DNS、高性能Web服务器一网打尽

构建高效网络应用必备&#xff1a;解读Rust异步编程神器 前言 Rust 是一种快速流行的系统编程语言&#xff0c;它以其内存安全和并发性能而闻名。在 Rust 生态系统中&#xff0c;有许多优秀的库和框架可以帮助开发者构建高性能、可靠的应用程序。本文将介绍几个在 Rust 中备受…...

04学生管理系统(栈)

文章目录 预处理菜单结构体主函数函数声明栈操作功能实现 预处理 #define _CRT_SECURE_NO_WARNINGS #include<stdio.h> #include<stdlib.h> #include<windows.h> #include<conio.h>#define OVERFLOW -2 #define FALSE 0 #define TRUE 1 #define OK 1 …...

我们如何在centos上部署批量管理工具ansible

1&#xff09;我们先准备环境、设备 #我们准备一台服务机 &#xff08;192.168.61.140&#xff09; ​#然后准备几天客户机&#xff08;192.168.61.141 192.168.61.142&#xff09;这里我们准备两台2)然后我们在客服务机里面添加域名 vi /etc/hosts ​ #添加如下内容 192.…...

如何评估前端代码审查培训计划的有效性?

评估前端代码审查培训计划的有效性可以通过以下方法&#xff1a; 培训前后测试&#xff1a; 在培训前后对学员进行测试&#xff0c;比较结果以评估知识增长。 学员反馈&#xff1a; 通过问卷调查、访谈或开放式反馈收集学员对培训内容、方式和效果的看法。 参与度&#xff1a…...

使用nvm切换Node.js版本

一、安装nvm nvm&#xff08;Node Version Manager&#xff09;是一个用于管理Node.js版本的工具&#xff0c;它允许你在同一台机器上安装和切换多个Node.js版本。 1.安装nvm https://github.com/coreybutler/nvm-windows 访问以上链接到github去下载 点击releases 下载下图…...

x264 编码器 PSNR算法源码分析

PSNR PSNR(Peak Signal-to-Noise Ratio,峰值信噪比)是一种常用的图像质量评价指标,用于衡量图像或视频的清晰度和质量。PSNR是基于信号的最大可能功率与影响信号的噪声功率之间的比率。在图像处理领域,PSNR通常用来评估图像压缩或图像增强算法的效果。 PSNR的计算公式是…...

开源web版3D展示工具Online3DViewer

Online3DViewer是一个免费且开源的Web解决方案&#xff0c;它允许用户在浏览器中直接预览和探索3D模型。 以下是关于Online3DViewer的详细介绍&#xff1a; 一、基本概述 定义&#xff1a;Online3DViewer是一个在线3D模型查看器&#xff0c;支持多种3D文件格式&#xff0c;用…...

白骑士的Matlab教学实战项目篇 4.2 信号与图像处理项目

系列目录 上一篇&#xff1a;白骑士的Matlab教学实战项目篇 4.1 数据分析与可视化 信号处理和图像处理是 MATLAB 的重要应用领域&#xff0c;广泛应用于医学、工程、科学研究等领域。以下内容将介绍信号滤波与频域分析、图像增强与分割的基本概念和方法&#xff0c;并通过一个…...

复现、并改进open-mmlab的mmpose详细细节

复现open-mmlab的mmpose详细细节 1.配置环境2.数据处理3.训练4.改进mmpose4.1 快速调试技巧4.2 快速定位4.3 改进backbone4.3.1 使用说明4.3.2 改进案例4.3.2.1 复现mmpose原配置文件4.3.2.2 复现开源项目4.3.2.3 修改配置文件4.3.2.4 修改新模型 4.4 添加auxiliary_head4.4.1 …...

编写兼容Python2.x与3.x代码

编写兼容Python2.x与3.x代码 当我们正处于Python2.x到Python3.x的过渡期时&#xff0c;你可能想过是否可以在不修改任何代码的前提下能同时运行在Python2和3中。这看起来还真是一个合理的诉求&#xff0c;但如何开始呢&#xff1f;哪些Python2代码在3.x解释器执行时容易出状况…...

比特币8.12学习问题

疑问&#xff1a;什么是过滤&#xff0c;什么是offset 没有投钱的情况下&#xff0c;怎么用api 公式&#xff1a;单币分配金额 总资金 / 2/ offset/选币数量&#xff0c;其中2 表示多空 买入滑点&#xff08;Slippage&#xff09;是指在执行交易订单时&#xff0c;实际成交…...

解析 Vue 中的app.version、 app.provide 与 app.runWithContext :原理、应用与实例剖析

目录 app.provide app.runWithContext ​​​​​​​app.version 非 VIP 用户能够通过积分下载博文资源 app.provide 在 Vue 3.0 中,app.provide充当着在应用层级提供全局共享数据或者服务的关键角色。 app.provide(key, value) 这一方法接收两个关键参数,其中 …...

Ubuntu server 命令行跑selenium

背景 自动化测试都是在本机win上使用selenium 跑自动化脚本,但是服务器都是命令行的没有web界面 依赖包部署 apt-get install zlib1g-dev zlib1g## 安装谷歌浏览器 ## 跳到底部,选择其他平台 https://www.google.com/chrome/## ubuntu # dpkg -i google-chrome-stable_…...

刚刚,模糊测试平台SFuzz受到行业认可

近日&#xff0c;中国网络安全产业联盟&#xff08;CCIA&#xff09;正式发布了“2024年网络安全优秀创新成果大赛-安全严选专题赛”评选结果&#xff0c;开源网安模糊测试平台SFuzz凭借重大创新能力&#xff0c;得到组委会认可&#xff0c;获本次大赛创新产品优胜奖。 2024年网…...

数据结构与算法——DFS(深度优先搜索)

算法介绍&#xff1a; 深度优先搜索&#xff08;Depth-First Search&#xff0c;简称DFS&#xff09;是一种用于遍历或搜索树或图的算法。这种算法会尽可能深地搜索图的分支&#xff0c;直到找到目标节点或达到叶节点&#xff08;没有子节点的节点&#xff09;&#xff0c;然后…...

基于lambda简化设计模式

写在文章开头 本文将演示基于函数式编程的理念&#xff0c;优化设计模式中繁琐的模板化编码开发&#xff0c;以保证用尽可能少的代码做尽可能多的事&#xff0c;希望对你有帮助。 Hi&#xff0c;我是 sharkChili &#xff0c;是个不断在硬核技术上作死的 java coder &#xff…...

揭秘! 经纬恒润“车路云一体化”方案研发服务背后的科技驱动力

随着高级别智能驾驶技术的飞速发展&#xff0c;自动驾驶与路侧基础设施协同合作已成为行业内的又一热点。我国率先提出以“车路云一体化”为核心的战略布局&#xff0c;国家政策密集出台&#xff0c;地方试点积极推进&#xff0c;行业标准日趋完善&#xff0c;智能网联汽车“车…...

Redis操作--RedisTemplate(二)StringRedisTemplate

一、介绍 1、简介 由于存储在 Redis 中的 key 和 value 通常是很常见的 String 类型&#xff0c;Redis模块提供了 RedisConnection 和 RedisTemplate 的扩展&#xff0c;分是 StringRedisConnection 和 StringRedisTemplate&#xff0c;作为字符串操作的解决方案。 通过源码…...

【自动驾驶】ROS中自定义格式的服务通信,含命令行动态传参(c++)

目录 通信流程创建服务器端及客户端新建服务通讯文件修改service的xml及cmakelistCMakeLists.txt编辑 msg 相关配置编译消息相关头文件在cmakelist中包含头文件的路径在service包下编写service.cpp在client包下编写client.cpp测试运行查询服务的相关指令列出目前的所有服务&…...

优思学院|PDCA和DMAIC之间如何选择?

在现代组织中&#xff0c;提升方法、质量和效率是企业追求卓越、保持竞争力的核心目标。在这条道路上&#xff0c;DMAIC&#xff08;定义、测量、分析、改进、控制&#xff09;和PDCA&#xff08;计划、执行、检查、行动&#xff09;被广泛应用于持续改进和问题解决。这两者虽然…...

5 款最佳 Micro SD 卡恢复软件,助您恢复文件

您是否对数据恢复存在某些疑问&#xff0c;并想知道如何恢复 Micro SD 卡上的文件&#xff1f;如果是&#xff0c;那么在本文中您将找到答案。网上有许多专门用于从 Micro SD 卡或格式化的 Micro 卡恢复已删除文件而设计的软件。因此&#xff0c;在本文中&#xff0c;我们将向您…...

【使用教程】CiA402中的“原点回归模式”和“轮廓位置模式”搭配使用操作实例

使用“原点回归模式”配合“轮廓位置模式”是步进或伺服电机使用过程中最常用的方法&#xff0c;其对于提高自动化生产线的准确性和效率具有重要意义&#xff0c;本文将对正常使用控制电机中发送的命令及顺序进行简要说明。 说明&#xff1a;“原点回归”以“堵转回原点”的方式…...

服务器网络不通排查方案

服务器网络不通排查方案 最近遇到了服务器上服务已经启动&#xff0c;但是在浏览器上无法访问的问题&#xff0c;记录一下排查流程 文章目录 服务器网络不通排查方案netstart排查网络连接信息netstat 命令netstat -aptn 命令 iptables总结 netstart排查网络连接信息 netstat …...

Spring Boot + Vue 跨域配置(CORS)问题解决历程

在使用 Spring Boot 和 Vue 开发前后端分离的项目时&#xff0c;跨域资源共享&#xff08;CORS&#xff09;问题是一个常见的挑战。接下来&#xff0c;我将分享我是如何一步步解决这个问题的&#xff0c;包括中间的一些试错过程&#xff0c;希望能够帮助到正在经历类似问题的你…...