redis实现消息队列的几种方式
一、了解
众所周知,redis是我们日常开发过程中使用最多的非关系型数据库,也是消息中间件。实际上除了常用的rabbitmq、rocketmq、kafka消息队列(大家自己下去研究吧~模式都是通用的),我们也能使用redis实现消息队列。因为其他中间件可能更适用于大型/企业级项目,在咱们项目前期不需要这么多的数据,redis跟我们也是高度集成的。这里就简化了技术栈。
二、常用的几种使用redis实现的消息队列方式
1、List数据结构
Redis列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)一个列表最多可以包含 232 - 1 个元素 (4294967295, 每个列表超过40亿个元素)。
这里的列表大家可以想想为一个横着的通道,假设我现在往右边插入第一条数据,这个元素就会被放在最左边,接着再放入第二条数据,它就会在左边第二条,以此类推…插入了100条数据。 假设这个时候我要取出第一条,我就从最左边取就好。
这就变相实现了有序消息队列。具体实现大家自己研究
优点:操作方便,可以有序的取出自己插入的数据
缺点:不能进行实时消费,没有消费者
2、pub/sub 订阅消费模式
这就是传统的生产者->队列->消费者的模式。生产者的消息所有订阅者都能收到。
优点:实现了发布订阅模式,可以实时进行消费
缺点:没有消息持久化,在系统崩溃、宕机的时候;消息会丢失
3、sorted set有序集合Redis
有序集合和集合一样也是 string 类型元素的集合,且不允许重复的成员。不同的是每一个元素都会关联一个double分数,redis就是通过分数为集合中的成员进行从大到小的排列。
有序集合的成员是唯一的,但是score是可以重复的。
生成消息直接往s-set中插入数据,将score设置为接收到数据的13位时间戳;需要使用的时候再根据score大小有序取出来就行了。
看到这里是不是大家能想到,既然每条消息都带有时间,那我是不是可以顺手实现延迟队列。
这里只需要将score设置为 接受消息的时间戳+延迟时间 。我在使用的时候获取当天时间戳的数据,这样就实现了延迟消息队列。
优点:操作方便,可以实现延迟队列
缺点:不能实时进行消费
4、stream流 (redis5.0版本以上才有 重点讲)
Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:
每个stream流都有自己的名称,它是redis的key,也可以理解为队列名称。
Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。
stream常用命令
- XADD 定义stream流,写入消息体
XADD mystream * field1 A field2 B field3 C field4 D
mystream:自定义流名称
*:由redis生成流的id(也可以自定义,但是得保证自增唯一)
field1-A \field2-B\field3-C :保存的消息体,key-value形式-- 举例
redis> XADD mystream * name Sara surname OConnor
"1601372323627-0"
- XDEL 删除消息
> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-02) 1) "a"2) "1"
2) 1) 1538561701744-02) 1) "c"2) "3"
- XRANGE 获取消息队列数据
XRANGE key start end [COUNT count]key:strem流名称
start:开始值,- 表示最小值
end:结束值,+ 表示最大值-- 举例:
redis> XRANGE mystream - + 2
从mystrem全部数据中取出两条数据redis> XRANGE mystream + - 1
从mystream倒叙取一条数据
- XREVRANGE 自动过滤已删除的消息
redis> XADD writers * name Virginia surname Woolf
"1601372731458-0"
redis> XADD writers * name Jane surname Austen
"1601372731459-0"
redis> XADD writers * name Toni surname Morrison
"1601372731459-1"
redis> XADD writers * name Agatha surname Christie
"1601372731459-2"
redis> XADD writers * name Ngozi surname Adichie
"1601372731459-3"
redis> XLEN writers
(integer) 5
redis> XREVRANGE writers + - COUNT 1
1) 1) "1601372731459-3"2) 1) "name"2) "Ngozi"3) "surname"4) "Adichie"
redis>
- XREAD 阻塞或者非阻塞获取消息
# 从 Stream 头部读取两条消息
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"2) 1) 1) 1526984818136-02) 1) "duration"2) "1532"3) "event-id"4) "5"5) "user-id"6) "7782813"2) 1) 1526999352406-02) 1) "duration"2) "812"3) "event-id"4) "9"5) "user-id"6) "388234"
2) 1) "writers"2) 1) 1) 1526985676425-02) 1) "name"2) "Virginia"3) "surname"4) "Woolf"2) 1) 1526985685298-02) 1) "name"2) "Jane"3) "surname"4) "Austen"
count :数量
milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
key :队列名
id :消息 ID
- XGROUP CREATE 创建消费者组
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]key :队列名称,如果不存在就创建
groupname :组名。
$ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略从头开始消费:
XGROUP CREATE mystream consumer-group-name 0-0 从尾部开始消费:
XGROUP CREATE mystream consumer-group-name $
以上就是常用的steam流的命令,大家下来自己测试,练习。
三、springboot整合redis stream流
java中提供了连接redis的客户端,jedis和lettuce、redistemplate;RedisTemplate 是 Spring Data Redis 提供的一个高级抽象层,封装了 Jedis 或 Lettuce 等底层客户端。
它提供了丰富的功能,如序列化、事务支持、键过期等。这里主要讲主流的redistemplate整合,大家以后能直接使用。
实时消费
实时消费顾名思义,生产者发送消息,消费者立马进行消费逻辑处理。
- RedisStreamUtils工具类,方便后续进行stream操作没根据自己项目需求来定义
@Configuration
@SuppressWarnings("all")
public class RedisStreamUtils {@Resourceprivate RedisTemplate<String, Object> redisTemplate;/*** 创建消费组** @param streamKey 键名称* @param group 组名称* @return {@link String}*/public String createGroup(String streamKey, String group) {return redisTemplate.opsForStream().createGroup(streamKey, group);}/*** 获取消费者信息** @param streamKey 键名称* @param group 组名称* @return {@link StreamInfo.XInfoConsumers}*/public StreamInfo.XInfoConsumers queryConsumers(String streamKey, String group) {return redisTemplate.opsForStream().consumers(streamKey, group);}/*** 查询组信息** @param streamKey 键名称* @return*/public StreamInfo.XInfoGroups queryGroups(String streamKey) {return redisTemplate.opsForStream().groups(streamKey);}// 添加Map消息public String addMap(String streamKey, Map<String, Object> value) {return Objects.requireNonNull(redisTemplate.opsForStream().add(streamKey, value)).getValue();}// 读取消息public List<MapRecord<String, Object, Object>> read(String streamKey) {return redisTemplate.opsForStream().read(StreamOffset.fromStart(streamKey));}// 确认消费public Long ack(String streamKey, String group, String... recordIds) {return redisTemplate.opsForStream().acknowledge(streamKey, group, recordIds);}// 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁public Long del(String key, String... recordIds) {return redisTemplate.opsForStream().delete(key, recordIds);}// 判断是否存在keypublic boolean hasKey(String key) {Boolean aBoolean = redisTemplate.hasKey(key);return aBoolean != null && aBoolean;}}
- RedisConfig配置文件
@Configuration
@Slf4j
@RequiredArgsConstructor
public class RedisConfig {private final RedisStreamUtils redisStreamUtil;private final Environment environment;//消费者处理消息配置@Beanpublic Subscription subscription(RedisConnectionFactory factory) {AtomicInteger index = new AtomicInteger(1);//获取系统处理器数量 创建线程池,开启守护线程int processors = Runtime.getRuntime().availableProcessors();ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,new LinkedBlockingDeque<>(), r -> {Thread thread = new Thread(r);thread.setName("async-stream-consumer-" + index.getAndIncrement());thread.setDaemon(true);return thread;});//流消息监听容器参数设置 StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()// 一次最多获取多少条消息.batchSize(5)//执行线程池.executor(executor)//阻塞消息读取(延迟消息).pollTimeout(Duration.ofSeconds(1))//异常处理.errorHandler(throwable -> {log.error("[MQ handler exception]", throwable);throwable.printStackTrace();}).build();//通过redis连接工厂,创建流消息监听容器var listenerContainer = StreamMessageListenerContainer.create(factory, options);//初始化流和消费者处理配置//初始化流和消费者处理配置Subscription subscription = initStreamAndConsumer(listenerContainer);//开启监听容器listenerContainer.start();return subscription;}private Subscription initStreamAndConsumer(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer){//↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓//这一部分可以不用配置,可以根据自己的实际情况配置//该key和group可根据需求自定义配置String streamName = "mystream";String groupname = "mygroup";initStream(streamName, groupname);// 手动ask消息//消费者处理完消息之后,会进行确认;这里有一个pending状态会变成已处理Subscription subscription = listenerContainer.receive(Consumer.from(groupname, "zhuyazhou"),StreamOffset.create(streamName, ReadOffset.lastConsumed()), new RedisConsumer(redisStreamUtil));// 自动ask消息/* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*///这一部分可以不用配置,可以根据自己的实际情况配置//↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑return subscription;}private void initStream(String key, String group) {boolean hasKey = redisStreamUtil.hasKey(key);if (!hasKey) {Map<String, Object> map = new HashMap<>(1);map.put("field", "value");//创建主题String result = redisStreamUtil.addMap(key, map);//创建消费组redisStreamUtil.createGroup(key, group);//将初始化的值删除掉redisStreamUtil.del(key, result);log.info("stream:{}-group:{} initialize success", key, group);}}
}
大家这里可以想一想,这种写法是不是符合生产过程中的创建队列/消费者的逻辑,是不是不方便。能不能在我需要的时候直接调用方法去创建???假设现在我新增了一个业务需求,需要用不同的业务逻辑去处理,而且我希望定制不同的消费者应答模式,这个时候就需要一个通用方法去实现,这里我是这样做的。还是在工具类中
创建redis流消息监听容器
主要参数 :定义线程池、一次最大获取消息数、超时重新获取、异常处理
@Beanpublic StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(RedisConnectionFactory factory) {log.info("redis ip:{},port:{}",environment.getProperty("spring.data.redis.host"),environment.getProperty("spring.data.redis.port"));AtomicInteger index = new AtomicInteger(1);int processors = Runtime.getRuntime().availableProcessors();ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,new LinkedBlockingDeque<>(), r -> {Thread thread = new Thread(r);thread.setName("async-stream-consumer-" + index.getAndIncrement());thread.setDaemon(true);return thread;});StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()// 一次最多获取多少条消息.batchSize(5).executor(executor).pollTimeout(Duration.ofSeconds(3)).errorHandler(throwable -> {log.error("[MQ handler exception]", throwable);throwable.printStackTrace();}).build();return StreamMessageListenerContainer.create(factory, options);}//业务需求调用此方法即可
public void addNewStreamAndSubscribe(String streamName, String groupName, String consumerId, StreamListener listener) {initStream(streamName, groupName);subscribeToStream(streamName, groupName, Consumer.from(groupName, consumerId), listener);}public void addNewStreamAndSubscribe(String streamName, String groupName, String consumerId, RedisConsumer listener,Map<String,Object> recodMap) {initStream(streamName, groupName);subscribeToStream(streamName, groupName, Consumer.from(groupName, consumerId), listener);addMap(streamName, recodMap);}private void subscribeToStream(String streamName, String groupName, Consumer consumer, StreamListener listener) {StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = streamMessageListenerContainer(redisConnectionFactory);Subscription subscription = container.receive(Consumer.from(groupName, consumer.getName()),StreamOffset.create(streamName, ReadOffset.lastConsumed()), listener);//开始消息容器监听container.start();log.info("Subscribed to stream: {} with group: {} and consumer: {}", streamName, groupName, consumer);}
streamMessageListenerContainer中的 .batchSize(1) 设置需要着重说一下。意思是在消费者在监听到数据的时候,一次从redis中取出的多少条数据,假设我设置1,就意味着我的监听器会redis中取出1条未消费的数据,随后进入消费者逻辑,处理完毕之后返回;继续由监听器读取1条数据,在进入消费者逻辑;这个值设置得越小消息处理数据越快,但是也会增加redis链接的资源。
较大的 batchSize 可以减少与 Redis 服务器的交互次数,降低网络通信开销,提高处理效率。
较小的 batchSize 适用于需要低延迟处理的场景,但会增加网络通信开销和 CPU 使用率。
- RedisConsumer消费者
@Component("RedisConsumer")
@RequiredArgsConstructor
@Slf4j
public class RedisConsumer implements StreamListener<String, MapRecord<String,String,String>> {private final RedisStreamUtils redisStreamUtils;@Overridepublic void onMessage(MapRecord<String, String, String> message) {try {log.info("RedisConsumer1获取到了消息:{}",message);String streamKey = message.getStream();RecordId recordId = message.getId();Map<String, String> value = message.getValue();//获取这个流下 所有的消费者组StreamInfo.XInfoGroups xInfoGroups = redisStreamUtils.queryGroups(streamKey);//处理逻辑//↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓log.info("【streamKey】= {},【recordId】= {},【msg】= {}",streamKey,recordId, value);//手动确认ack消息,并删除已处理的消息//我这里使用手动xInfoGroups.forEach(xInfoGroup -> redisStreamUtils.ack(streamKey, xInfoGroup.groupName(), recordId.getValue()));//自动确认消息 ---------自己下来研究//↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑//根据业务场景来看是否需要删除消息
// redisStreamUtils.del(streamKey, recordId.getValue());} catch (Exception e) {throw new ServiceException("消费异常");}}
}
- RedisConsumer2消费者
@Component("RedisConsumer2")
@RequiredArgsConstructor
@Slf4j
public class RedisConsumer2 implements StreamListener<String, MapRecord<String,String,String>> {private final RedisStreamUtils redisStreamUtils;@Overridepublic void onMessage(MapRecord<String, String, String> message) {try {log.info("RedisConsumer2获取到了消息:{}",message);String streamKey = message.getStream();RecordId recordId = message.getId();Map<String, String> value = message.getValue();//获取这个流下 所有的消费者组StreamInfo.XInfoGroups xInfoGroups = redisStreamUtils.queryGroups(streamKey);//处理逻辑//↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓log.info("【streamKey】= {},【recordId】= {},【msg】= {}",streamKey,recordId, value);//↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑//手动确认ack消息,并删除已处理的消息xInfoGroups.forEach(xInfoGroup -> redisStreamUtils.ack(streamKey, xInfoGroup.groupName(), recordId.getValue()));
// redisStreamUtils.del(streamKey, recordId.getValue());} catch (Exception e) {throw new ServiceException("消费异常");}}
}
- RedisStreamcontroller模拟测试
@RequestMapping(value = "/redisStream")
@RestController
@RequiredArgsConstructor
@Slf4j
@SuppressWarnings("all")
public class RedisStreamController {private final RedisStreamUtils redisStreamUtils;private final RedisConsumer redisConsumer;private final RedisTemplate redisTemplate;private final ApplicationContext applicationContext;@GetMapping(value = "/addNewStreamAndSubscribe")public ResultVO addNewStreamAndSubscribe(@RequestParam("streamKey") String streamKey,@RequestParam("groupName") String groupName,@RequestParam("consumer")String consumer,@RequestParam("consumerClass") String consumerClass){try {// 获取实现类的实例StreamListener consumerInstance = (StreamListener) applicationContext.getBean(consumerClass);redisStreamUtils.addNewStreamAndSubscribe(streamKey, groupName, consumer,consumerInstance );} catch (Exception e) {throw new RuntimeException(e);}return ResultVO.success();}@GetMapping(value = "/addMap")public ResultVO addMap(@RequestParam("streamKey") String streamKey,@RequestParam("key")String key,@RequestParam("value")String value) {HashMap<String, Object> objectObjectHashMap = new HashMap<>();objectObjectHashMap.put(key,value);redisStreamUtils.addMap(streamKey,objectObjectHashMap);return ResultVO.success();}@GetMapping(value = "/getGroup")public ResultVO getGroup(@RequestParam("streamKey") String streamKey,@RequestParam("groupName") String groupName) {boolean b = redisStreamUtils.hasKey(streamKey);if(b){StreamInfo.XInfoGroups xInfoGroups = redisStreamUtils.queryGroups(streamKey);List<Object> list = new ArrayList<>();for (StreamInfo.XInfoGroup xInfoGroup : xInfoGroups) {StreamInfo.XInfoConsumers xInfoConsumers = null;if(StrUtil.isNotEmpty(groupName)){xInfoConsumers = redisStreamUtils.queryConsumers(streamKey, groupName);for (StreamInfo.XInfoConsumer xInfoConsumer : xInfoConsumers) {log.info("group:{},pending:{},consumerCount:{},consumerName:{},lastDeliveryId:{}",xInfoGroup.groupName(),xInfoGroup.pendingCount(),xInfoGroup.consumerCount(),xInfoConsumer.consumerName(),xInfoGroup.lastDeliveredId());}}}}else{log.info("streamKey不存在:{}",streamKey);return ResultVO.error("streamKey不存在");}return ResultVO.success();}@GetMapping(value = "/delStream")public ResultVO delStream(@RequestParam("streamKey") String streamKey){redisTemplate.delete(streamKey);return ResultVO.success();}@GetMapping(value = "/readMsg")public ResultVO readMsg(@RequestParam("streamKey") String streamKey,@RequestParam("groupName") String groupName,@RequestParam("consumer") String consumer){// 读取消息,每次读取最多 5 条List read = redisTemplate.opsForStream().read(Consumer.from(groupName, consumer),StreamReadOptions.empty().count(10).block(Duration.ofSeconds(1)),StreamOffset.create(streamKey, ReadOffset.lastConsumed()));return ResultVO.success(JSON.toJSONString(read));}
项目启动
调用/addNewStreamAndSubscribe接口
- 创建流、监听容器
- 消费者绑定流+消费者逻辑处理类
- 接收生产者消息方式(最新、偏移量)
- 开启消息容器监听
调用/addMap接口,发送消息
如果只有一个消费者,那么当消费者出现异常的时候,直到服务恢复,会从上一次消费的数据开始进行消费。
假设现在消费者组有两个消费者,都绑定了同一个消息流,这个时候发送消息就是轮询访问。
RedisConsumer1获取到了消息
RedisConsumer2获取到了消息
RedisConsumer1获取到了消息
RedisConsumer2获取到了消息
…
如果consumer1出现了异常,这个时候consumer2会正常消费所有的数据。
stream本身就支持持久化数据,也是dbs和aof两种。不用担心数据丢失。
相关文章:

redis实现消息队列的几种方式
一、了解 众所周知,redis是我们日常开发过程中使用最多的非关系型数据库,也是消息中间件。实际上除了常用的rabbitmq、rocketmq、kafka消息队列(大家自己下去研究吧~模式都是通用的),我们也能使用redis实现消息队列。…...
debian 系统更新升级
系统升级能够有效避免漏洞导致的损害 不过需要做好提前和后续的测试,避免现有运行程序的错误。 debian安装参考:链接 一、清理过时和不再使用的源 1.清理源 vi /etc/apt/sources.list2.在下面的文件夹下清理不需要的 cd /etc/apt/sources.list.d二、…...
STM32学习笔记-----UART的概念
在 STM32 中,UART(Universal Asynchronous Receiver/Transmitter)是一种常用的串行通信接口,广泛应用于嵌入式系统中。STM32 提供了丰富的硬件资源来支持 UART 通信,可以通过标准库(STM32 HAL 或者标准外设…...
Pytest-Bdd-Playwright 系列教程(9):datatable 参数的使用
Pytest-Bdd-Playwright 系列教程(9):datatable 参数的使用 前言一、什么是 datatable 参数?Gherkin 表格示例 二、datatable 参数的基本使用feature文件:获取用户信息并执行相关操作的使用 datatable 处理表格数据Give…...

【408】SDN重点笔记
总特征:数据平面(负责转发)与控制平面(负责控制)分离 控制平面: 由服务器和软件组成。控制平面完成转发表,并分发。 路由器不再需要路由选择协议,不再交换信息,只负责收到…...
云运维基础
笔记内容侵权联系删除 云审计(CTS) 云审计云上资源变更均可被管控,实时系统性记录所有人的操作,无需手工统计。云审计服务支持将操作记录合并,周期性地生成事件文件实时同步转存到OBS存储桶,帮助用户实现…...

基于微信小程序的平安驾校预约平台的设计与实现(源码+LW++远程调试+代码讲解等)
摘 要 互联网发展至今,广泛参与在社会中的方方面面。它让信息都可以通过网络传播,搭配信息管理工具可以很好地为人们提供服务。针对高校教师成果信息管理混乱,出错率高,信息安全性差,劳动强度大,费时费力…...

Rust开发一个命令行工具(一,简单版持续更新)
依赖的包 cargo add clap --features derive clap命令行参数解析 项目目录 代码 main.rs mod utils;use clap::Parser; use utils::{editor::open_in_vscode,fs_tools::{file_exists, get_file, is_dir, list_dir, read_file}, }; /// 在文件中搜索模式并显示包含它的行。…...

实战:深入探讨 MySQL 和 SQL Server 全文索引的使用及其弊端
在数据库中处理大量文本数据时,包含搜索(例如查找包含特定单词的文本)往往是必需的。然而,直接使用 LIKE %text% 的方式在大数据量中进行模糊查询会造成性能瓶颈。为了解决这一问题,MySQL 和 SQL Server 提供了全文索引(Full-Text Indexing)功能,可以显著加速文本数据的…...
情景2 虚拟化世界 自己答案的理解
1、什么是虚拟化? 答:版本很多,选了两个作为参考。 定义1:虚拟化是创造设备或者资源的虚拟版本,如服务器、存储设备、网络或者操作系统。 定义2:虚拟化是资源的逻辑表示,它不受物理限制的约束。 2、寄生…...
【国产操作系统对Qt支持有哪些?】
国产操作系统 鸿蒙操作系统:由华为开发,主要用于智能设备和物联网领域。 深度操作系统:基于Linux的操作系统,适用于个人电脑和服务器。 中标麒麟:由中国电子科技集团公司研发,适用于服务器和桌面环境。 悠然操作系统:面向教育和个人用户的Linux发行版。 红旗Linux:早期…...
深度学习--正则化
笔记内容侵权联系删 过拟合问题 过拟合问题描述:模型在训练集表现优异,但在测试集上表现较差。 根本原因:特征维度过多,模型假设过于复杂,参数过多,训练数据过少,噪声过多导致拟合出的函数几乎完美的对训练集做出预…...
PHP反序列化_1
目录 一、基本概念 1. 序列化 2. 反序列化 二、反序列化漏洞 1. 漏洞产生原因 2. 魔术方法 3.利用魔术方法进行攻击的示例: 一、基本概念 什么是 PHP 反序列化 PHP 反序列化是将序列化后的字符串恢复为原始 PHP 数据类型(如对象、数组等&#x…...
深度学习在图像识别中的应用
💓 博客主页:瑕疵的CSDN主页 📝 Gitee主页:瑕疵的gitee主页 ⏩ 文章专栏:《热点资讯》 深度学习在图像识别中的应用 深度学习在图像识别中的应用 深度学习在图像识别中的应用 引言 深度学习概述 定义与原理 发展历程 …...

SQL面试题——奔驰SQL面试题 车辆在不同驾驶模式下的时间
SQL面试题——奔驰SQL面试题 我们的表大致如下 CREATE TABLE signal_log( vin STRING COMMENTvehicle frame id, signal_name STRING COMMENTfunction name, signal_value STRING COMMENT signal value , ts BIGINT COMMENTevent timestamp, dt STRING COMMENTformat yyyy-mm…...

Leecode刷题C语言之统计好节点的数目
执行结果:通过 执行用时和内存消耗如下: 题目:统计好节点的数目 现有一棵 无向 树,树中包含 n 个节点,按从 0 到 n - 1 标记。树的根节点是节点 0 。给你一个长度为 n - 1 的二维整数数组 edges,其中 edges[i] [ai,…...

webpack5 + vue3 从零配置项目
前言 虽然在实际项目当中很少会从 0 到 1 配置一个项目,毕竟很多重复工作是没有必要的,脚手架将这些重复性的工作进行了整合,方便开发者使用。也正因如此,导致部分开发者过于依赖脚手架,却不清楚其内部的实现流程&…...

Queuing 表(buffer表)的优化实践 | OceanBase 性能优化实践
案例问题描述 该案例来自一个金融行业客户的问题:他们发现某个应用对一个数据量相对较小的表(仅包含数千条记录)访问时,频繁遇到性能下降的情况。为解决此问题,客户向我们求助进行分析。我们发现这张表有频繁的批量插…...
./mysqld: error while loading shared libraries: libaio.so.1: cannot open sha
mysql:5.6 使用离线方式安装:rpm -ivh --nodeps mysql* ,执行 systemctl start mysqld.service发现启动不了,通过vi /var/log/mysql.log看到如下关键字:libraries: libaio.so.1,之前也是按照网上帖子各种修改都没有解决…...

Qt主线程把数据发给子线程,主线程会阻塞吗
演示: #include <QCoreApplication> #include <QThread> #include <QObject> #include <QDebug>// 子线程类 class Worker : public QObject {Q_OBJECT public slots:void processData(int data) {qDebug() << "Processing dat…...

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?
编辑:陈萍萍的公主一点人工一点智能 未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战,在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...
linux 错误码总结
1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...
鱼香ros docker配置镜像报错:https://registry-1.docker.io/v2/
使用鱼香ros一件安装docker时的https://registry-1.docker.io/v2/问题 一键安装指令 wget http://fishros.com/install -O fishros && . fishros出现问题:docker pull 失败 网络不同,需要使用镜像源 按照如下步骤操作 sudo vi /etc/docker/dae…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...

Mac下Android Studio扫描根目录卡死问题记录
环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中,提示一个依赖外部头文件的cpp源文件需要同步,点…...

Unsafe Fileupload篇补充-木马的详细教程与木马分享(中国蚁剑方式)
在之前的皮卡丘靶场第九期Unsafe Fileupload篇中我们学习了木马的原理并且学了一个简单的木马文件 本期内容是为了更好的为大家解释木马(服务器方面的)的原理,连接,以及各种木马及连接工具的分享 文件木马:https://w…...

论文阅读笔记——Muffin: Testing Deep Learning Libraries via Neural Architecture Fuzzing
Muffin 论文 现有方法 CRADLE 和 LEMON,依赖模型推理阶段输出进行差分测试,但在训练阶段是不可行的,因为训练阶段直到最后才有固定输出,中间过程是不断变化的。API 库覆盖低,因为各个 API 都是在各种具体场景下使用。…...
基于鸿蒙(HarmonyOS5)的打车小程序
1. 开发环境准备 安装DevEco Studio (鸿蒙官方IDE)配置HarmonyOS SDK申请开发者账号和必要的API密钥 2. 项目结构设计 ├── entry │ ├── src │ │ ├── main │ │ │ ├── ets │ │ │ │ ├── pages │ │ │ │ │ ├── H…...

Sklearn 机器学习 缺失值处理 获取填充失值的统计值
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 使用 Scikit-learn 处理缺失值并提取填充统计信息的完整指南 在机器学习项目中,数据清…...
Pydantic + Function Calling的结合
1、Pydantic Pydantic 是一个 Python 库,用于数据验证和设置管理,通过 Python 类型注解强制执行数据类型。它广泛用于 API 开发(如 FastAPI)、配置管理和数据解析,核心功能包括: 数据验证:通过…...