当前位置: 首页 > news >正文

RocketMQ 消息消费 轮询机制 PullRequestHoldService

1. 概述

先来看看 RocketMQ 消费过程中的轮询机制是啥。首先需要补充一点消费相关的前置知识。

1.1 消息消费方式

RocketMQ 支持多种消费方式,包括 Push 模式和 Pull 模式

  • Pull 模式:用户自己进行消息的拉取和消费进度的更新
  • Push 模式:Broker 将新的消息自动发送给用户进行消费

1.2 Push 消费模式

我们一般使用 RocketMQ 时用的是 Push 模式,因为比较方便,不需要手动拉取消息和更新消费进度。

那么你有没有想过 Push 模式是如何做到能够立即消费新的消息?

1.2.1 Push 模式原理

实际上,在 Push 消费时,消费者是在不断轮询 Broker,询问是否有新消息可供消费。一旦有新消息到达,马上拉取该消息。也就是说 Push 模式内部也用了 Pull 消息的模式,这样就可以立即消费到最新的消息。

1.3 如何进行轮询?

那么 Push 模式或 Pull 模式如何进行消息的查询?

能够想到的比较笨的方法是,每隔一定的时间(如1ms)就向 Broker 发送一个查询请求,如果没有新消息则立刻返回。可想而知这种方法非常浪费网络资源。

RocketMQ 为了提高网络性能,在拉取消息时如果没有新消息,不会马上返回,而是会将该查询请求挂起一段时间,然后再重试查询。如果一直没有新消息,直到轮询时间超过设定的阈值才会返回。

根据轮询设定的超时阈值大小的不同,RocketMQ 有两种轮询方式,分别为长轮询(默认)和短轮询。

1.4 长轮询和短轮询

RocketMQ 的 Broker 端参数 longPollingEnable 可以配置轮询方式,默认为 true

  • 短轮询:longPollingEnable=false,轮询时间为 shortPollingTimeMills ,默认为 1s
  • 长轮询:longPollingEnable=true,轮询时间为 5s。拉取请求挂起时间:受 DefaultMQPullConsumerbrokerSuspendMaxTimeMillis 控制,默认push模式固定15s,pull模式固定20s。

2. 概要流程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-b6pQzSWr-1646145661686)(https://raw.githubusercontent.com/HScarb/drawio-diagrams/main/rocketmq/consume/long_polling_activity.drawio.svg)]

根据上面的活动图来看一下 RocketMQ 消费时的轮询机制流程

  1. Consumer 发送拉取消息请求
  2. Broker 收到请求后交给请求处理模块处理
  3. 尝试从存储的消息中拉取消息
  4. 如果能够拉取消息,那么将拉取到的消息直接返回
  5. 如果没有拉取到消息,那么根据 Broker 是否支持挂起和是否开启长轮询来判断是否要进行轮询以及进行哪种轮询。
    1. 如果支持挂起,那么会将该拉取请求挂起
    2. 长轮询等待 5s
    3. 短轮询等待 1s
  6. 检查消费队列中是否有新消息到达,如果没有则继续等待,以此循环。如果有新消息,处理挂起的拉取消息请求并返回消费者。
  7. 如果没有新消息到达,轮询后会检查每个挂起的拉取请求的挂起时间是否超过挂起时间阈值,如果超过那么也会直接返回消费者,否则继续循环进行轮询操作。


那么按照上述流程,开启长轮询的情况下,如果一次轮询没有找到消息,要等待 5s 才能进行下一次查询。如果这 5s 当中有新的消息存入,如何保证能够立刻消费到?

解决方案不难想到,就是新的消息写入后,主动进行通知,让挂起的拉取请求立刻进行拉取操作。

RocketMQ 就是这么做的,在消息存入 CommitLog 后的 doReput 方法中,会判断是否是长轮询,如果是则会发送一个通知,让挂起的拉取请求立刻进行处理。

3. 详细流程

3.1 涉及到的类

3.1.1 PullMessageProcessor

该类是 Broker 处理 Consumer 拉取清求的入口类。当 Broker 收到 Consumer 发送的拉取请求时,调用该类的 processRequest 方法

3.1.2 PullRequestHoldService

长轮询请求管理线程,挂起的拉取请求会在这里进行保存。每等待一段时间(长轮询/短轮询等待时间)会检查挂起的请求中是否有可以进行拉取的数据。

3.1.3 DefaultMessageStore#ReputMessageService

该线程负责将存储到 CommitLog 的消息重新转发,用以生成 ConsumeQueue 和 IndexFile 索引。在生成索引之后,会向长轮询线程发送提醒,立刻唤醒相应队列的拉取请求,执行消息拉取。

3.2 时序图

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-brvsznhE-1646145661687)(https://raw.githubusercontent.com/HScarb/drawio-diagrams/main/rocketmq/consume/long_polling_sequence.drawio.svg)]

着重体现了长轮询逻辑,其他逻辑有所省略

  1. 消费者调用 pullKernelImpl() 发送拉取请求,调用时用 brokerSuspendMaxTimeMillis 指定了 Broker 挂起的最长时间,默认为 20s
  2. Broker 中 PullMessageProcess 处理拉取请求,从 ConsumeQueue 中查询消息
  3. 如果没有查询到消息,判断是否启用长轮询,调用 PullRequestHoldService#suspendPullRequest() 方法将该请求挂起
  4. PullRequestHoldService 线程 run() 方法循环等待轮询时间,然后周期性调用 checkHoldRequest() 方法检查挂起的请求是否有消息可以拉取
  5. 如果检查到有新消息可以拉取,调用 notifyMessageArriving() 方法
  6. ReputMessageService 的 doReput() 如果被调用,说明也有新消息到达,需要唤醒挂起的拉取请求。这里也会发送一个 notify,进而调用 notifyMessageArriving() 方法
  7. notifyMessageArriving() 方法中也会查询 ConsumeQueue 的最大 offset,如果确实有新消息,那么将唤醒对应的拉取请求,具体的方法是调用 executeRequestWhenWakeup() 方法
  8. executeRequestWhenWakeup() 方法唤醒拉取请求,调用 processRequest() 方法处理该请求

3.3 每个类的具体逻辑

3.3.1 PullMessageProcessor

Broker 处理 Consumer 拉取清求的入口类

  • RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request):处理 Consumer 拉取请求的入口方法,收到 Consumer 拉取请求时调用。该方法主要完成如下操作

    1. 校验
    2. 消息过滤
    3. 从存储中查询消息
    4. 返回响应给 Consumer

    如果从存储中没有查询到消息,会将响应码设置为 ResponseCode.PULL_NOT_FOUND,并且启动长轮询

  • void executeRequestWhenWakeup(Channel channel, final RemotingCommand request):将 Hold 的拉取请求唤醒,再次拉取消息

    • 该方法在长轮询收到新消息时调用,立即唤醒挂起的拉取请求,然后对这些请求调用 processRequest 方法
    • 何时需要提醒长轮询新消息已经到达?上面说到,在长轮询等待时如果有新消息到达,CommitLogdoReput 方法中会进行提醒,最终会调用 executeRequestWhenWakeup 方法

3.3.2 PullRequestHoldService

该服务线程会从 pullRequestTable 本地缓存变量中取PullRequest请求,检查轮询条件“待拉取消息的偏移量是否小于消费队列最大偏移量”是否成立,如果条件成立则说明有新消息达到Broker端,则通过PullMessageProcessor的executeRequestWhenWakeup()方法重新尝试发起Pull消息的RPC请求

  • pullRequestTable

    private ConcurrentMap<String/* topic@queueId */, ManyPullRequest/* 同一队列积累的拉取请求 */> pullRequestTable = new ConcurrentHashMap<>(1024)
    

    上面是挂起的消息拉取请求容器,它是一个 ConcurrentHashMap,key 是拉取请求的队列,value 是该队列挂起的所有拉取请求。其中 ManyPullRequest 底层是一个 ArrayList,它的 add 方法加了锁。

  • suspendPullRequest(String topic, int queueId, PullRequest pullRequest):将 Consumer 拉取请求暂时挂起,会将请求加入到 pullRequestTable

  • checkHoldRequest():检查所有挂起的拉取请求,如果有数据满足要求,就唤醒该请求,对其执行 PullMessageProcessor#processRequest 方法

  • run():线程主循环,每等待一段时间就调用 checkHoldRequest() 方法检查是否有请求需要唤醒。等待的时间根据长轮询/短轮询的配置决定,长轮询等待 5s,短轮询默认等待 1s

  • notifyMessageArriving():被 checkHoldRequest()ReputMessageService#doReput() 调用,表示新消息到达,唤醒对应队列挂起的拉取请求

3.3.3 DefaultMessageStore#ReputMessageService

该服务线程 doReput() 方法会在 Broker 端不断地从数据存储对象 CommitLog 中解析数据并分发请求,随后构建出 ConsumeQueue(逻辑消费队列)和 IndexFile(消息索引文件)两种类型的数据。

同时从本地缓存变量 PullRequestHoldService#pullRequestTable 中,取出挂起的拉起请求并执行。

4. 源码解析

4.1 PullMessageProcessor

4.1.1 processRequest

如果从存储中没有查询到消息,会将响应码设置为 ResponseCode.PULL_NOT_FOUND,并且启动长轮询

以下三种情况会将响应码设置为ResponseCode.PULL_NOT_FOUND

  1. NO_MESSAGE_IN_QUEUE:消费队列中没有任何消息
  2. OFFSET_FOUND_NULL:offset未找到任何数据
  3. OFFSET_OVERFLOW_ONE:待拉取偏移量等于队列最大偏移量

/*** 处理客户端请求入口** @param channel 网络通道,通过该通道向消息拉取客户端发送响应结果* @param request 消息拉取请求* @param brokerAllowSuspend Broker端是否允许挂起,默认true。true:如果未找到消息则挂起。false:未找到消息直接返回消息未找到* @return 响应* @throws RemotingCommandException 当解析请求发生异常时*/
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)throws RemotingCommandException {// ...switch (response.getCode()) {// ...// 如果从消费队列中未找到新的可以拉取的消息,判断并挂起该拉取请求case ResponseCode.PULL_NOT_FOUND:// 长轮询if (brokerAllowSuspend && hasSuspendFlag) {long pollingTimeMills = suspendTimeoutMillisLong;if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();}String topic = requestHeader.getTopic();long offset = requestHeader.getQueueOffset();int queueId = requestHeader.getQueueId();PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);response = null;break;}// ...
}

4.1.2 executeRequestWhenWakeup

在PullMessageProcessor的executeRequestWhenWakeup()方法中,通过业务线程池pullMessageExecutor,异步提交重新Pull消息的请求任务,即为重新调了一次PullMessageProcessor业务处理器的processRequest()方法,来实现Pull消息请求的二次处理)。

/*** 将Hold的拉取请求唤醒,再次拉取消息* 该方法调用线程池,因此,不会阻塞** @param channel 通道* @param request Consumer拉取请求* @throws RemotingCommandException 当远程调用发生异常*/
public void executeRequestWhenWakeup(final Channel channel,final RemotingCommand request) throws RemotingCommandException {Runnable run = new Runnable() {@Overridepublic void run() {try {// 处理Consumer拉取请求,获取返回体final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);if (response != null) {response.setOpaque(request.getOpaque());response.markResponseType();try {// 将返回体写入channel,返回给Consumerchannel.writeAndFlush(response).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {log.error("processRequestWrapper response to {} failed",future.channel().remoteAddress(), future.cause());log.error(request.toString());log.error(response.toString());}}});} catch (Throwable e) {log.error("processRequestWrapper process request over, but response failed", e);log.error(request.toString());log.error(response.toString());}}} catch (RemotingCommandException e1) {log.error("excuteRequestWhenWakeup run", e1);}}};// 异步执行请求处理和返回this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
}

4.2 PullRequestHoldService

4.2.1 suspendPullRequest

/*** 挂起(保存)客户端请求,当有数据的时候触发请求** @param topic 主题* @param queueId 队列编号* @param pullRequest 拉取消息请求*/
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {// 根据topic和queueId构造map的keyString key = this.buildKey(topic, queueId);// map的key如果为空,创建一个空的request队列,填充key和valueManyPullRequest mpr = this.pullRequestTable.get(key);if (null == mpr) {mpr = new ManyPullRequest();ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);if (prev != null) {mpr = prev;}}// 保存该次Consumer拉取请求mpr.addPullRequest(pullRequest);
}

4.2.2 checkHoldRequest

/*** 检查所有已经挂起的长轮询请求* 如果有数据满足要求,就触发请求再次执行*/
private void checkHoldRequest() {// 遍历拉取请求容器中的每个队列for (String key : this.pullRequestTable.keySet()) {String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);if (2 == kArray.length) {String topic = kArray[0];int queueId = Integer.parseInt(kArray[1]);// 从store中获取队列的最大偏移量final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);try {// 根据store中获取的最大偏移量,判断是否有新消息到达,如果有则执行拉取请求操作this.notifyMessageArriving(topic, queueId, offset);} catch (Throwable e) {log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);}}}
}

4.2.3 run

@Override
public void run() {log.info("{} service started", this.getServiceName());while (!this.isStopped()) {try {// 等待一定时间if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {// 开启长轮询,每5s判断一次消息是否到达this.waitForRunning(5 * 1000);} else {// 未开启长轮询,每1s判断一次消息是否到达this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());}long beginLockTimestamp = this.systemClock.now();// 检查是否有消息到达,可以唤醒挂起的请求this.checkHoldRequest();long costTime = this.systemClock.now() - beginLockTimestamp;if (costTime > 5 * 1000) {log.info("[NOTIFYME] check hold request cost {} ms.", costTime);}} catch (Throwable e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info("{} service end", this.getServiceName());
}

4.2.4 notifyMessageArriving

这个方法在两个地方被调用,如下图所示

Untitled

这个方法是重新唤醒拉取请求的核心方法。调用这个方法,提醒 PullRequestHoldService 线程有新消息到达

我们来看看这个方法具体做了什么

  1. 根据 topic 和 queueId 获取挂起的拉取请求列表
  2. 从 store 中获取该队列消息的最大offset
  3. 遍历该队列的所有拉取请求,符合以下两种条件之一的拉取请求会被处理并返回
    1. 消费队列最大offset比消费者拉取请求的offset大,说明有新的消息可以被拉取,处理该拉取请求
    2. 拉取请求挂起时间超过阈值,直接返回消息未找到
  4. 如果不满足以上两个条件,那么该拉取请求会重新放回 pullRequestTable,等待下次检查

/*** 当有新消息到达的时候,唤醒长轮询的消费端请求** @param topic     消息Topic* @param queueId   消息队列ID* @param maxOffset 消费队列的最大Offset*/
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {// 根据topic和queueId从容器中取出挂起的拉取请求列表String key = this.buildKey(topic, queueId);ManyPullRequest mpr = this.pullRequestTable.get(key);if (mpr != null) {// 获取挂起的拉取请求列表List<PullRequest> requestList = mpr.cloneListAndClear();if (requestList != null) {// 预先定义需要继续挂起的拉取请求列表List<PullRequest> replayList = new ArrayList<PullRequest>();for (PullRequest request : requestList) {long newestOffset = maxOffset;// 从store中获取该队列消息的最大offsetif (newestOffset <= request.getPullFromThisOffset()) {newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);}// 消费队列最大offset比消费者拉取请求的offset大,说明有新的消息可以被拉取if (newestOffset > request.getPullFromThisOffset()) {// 消息过滤匹配boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));// match by bit map, need eval again when properties is not null.if (match && properties != null) {match = request.getMessageFilter().isMatchedByCommitLog(null, properties);}if (match) {try {// 会调用PullMessageProcessor#processRequest方法拉取消息,然后将结果返回给消费者this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}}// 查看是否超时,如果Consumer请求达到了超时时间,也触发响应,直接返回消息未找到if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}// 当前不满足要求,重新放回Hold列表中replayList.add(request);}if (!replayList.isEmpty()) {mpr.addPullRequest(replayList);}}}
}

4.3 DefaultMessageStore#ReputMessageService

4.3.1 doReput

private void doReput() {// ...DefaultMessageStore.this.doDispatch(dispatchRequest);// 通知消息消费长轮询线程,有新的消息落盘,立即唤醒挂起的消息拉取请求if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()&& DefaultMessageStore.this.messageArrivingListener != null) {DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}

这里调用了 NotifyMessageArrivingListener#arriving() 方法,进而调用 PullRequestHoldService.notifyMessageArriving()。

为什么不直接调用 pullRequestHoldService.notifyMessageArriving() ?因为 doReput 所处的类所在的包是 store,存储包,而 PullRequestHoldService 在 broker 包中

所以需要一个桥梁,就是 NotifyMessageArrivingListener。它在 Broker 初始化 DefaultMessageStore 时被写入 DefaultMessageStore

4.3.2 NotifyMessageArrivingListener#arriving

public class NotifyMessageArrivingListener implements MessageArrivingListener {@Overridepublic void arriving(String topic, int queueId, long logicOffset, long tagsCode,long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {// 提醒长轮询请求管理容器,新的消息到达,立刻拉取最新消息this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,msgStoreTime, filterBitMap, properties);}
}

参考资料

  • 源码分析RocketMQ消息PULL-长轮询模式
  • 消息中间件—RocketMQ 消息消费(二)(push 模式实现)

相关文章:

RocketMQ 消息消费 轮询机制 PullRequestHoldService

1. 概述 先来看看 RocketMQ 消费过程中的轮询机制是啥。首先需要补充一点消费相关的前置知识。 1.1 消息消费方式 RocketMQ 支持多种消费方式&#xff0c;包括 Push 模式和 Pull 模式 Pull 模式&#xff1a;用户自己进行消息的拉取和消费进度的更新Push 模式&#xff1a;Broker…...

springboot 数据库版本升级管理常用解决方案

目录 一、前言 1.1 单独执行初始化sql 1.2 程序自动执行 二、数据库版本升级管理问题 三、spring 框架sql自动管理机制 3.1 jdbcTemplate 方式 3.1.1 创建数据库 3.1.2 创建 springboot 工程 3.1.3 初始化sql脚本 3.1.4 核心配置类 3.1.5 执行sql初始化 3.2 配置文…...

78. 子集

题目描述 给你一个整数数组 nums &#xff0c;数组中的元素 互不相同 。返回该数组所有可能的子集&#xff08;幂集&#xff09;。 解集 不能 包含重复的子集。你可以按 任意顺序 返回解集。 示例 1&#xff1a; 输入&#xff1a;nums [1,2,3] 输出&#xff1a;[[],[1],[2…...

Mask RCNN网络结构以及整体流程的详细解读

文章目录 1、概述2、Backbone3、RPN网络3.1、anchor的生成3.2、anchor的标注/分配3.3、分类预测和bbox回归3.4、NMS生成最终的anchor 4、ROI Head4.1、ROI Align4.2、cls head和bbox head4.3、mask head 1、概述 Mask RCNN是在Faster RCNN的基础上增加了mask head用于实例分割…...

Android Framework底层原理之WMS的启动流程

一 概述 今天&#xff0c;我们介绍 WindowManagerService&#xff08;后续简称 WMS&#xff09;的启动流程&#xff0c;WMS 是 Android 系统中&#xff0c;负责窗口显示的的服务。在 Android 中它也起着承上启下的作用。 如下图&#xff0c;就是《深入理解 Android》书籍中的…...

Leaflet入门,Leaflet加载xyz地图,以vue-leaflet插件加载高德地图为例

前言 本章介绍Leaflet使用vue2-leaflet或者vue-leaflet插件方式便捷加载xyz高德地图。 # 效果演示 vue如何使用Leaflet vue2如何使用:《Leaflet入门,如何使用vue2-leaflet实现vue2双向绑定式的使用Leaflet地图,以及初始化后拿到leaflet对象,方便调用leaflet的api》 vue3…...

【ARM Cache 系列文章 8 -- ARM DynamIQ 技术介绍

文章目录 DynamIQ 技术背景DynamIQ技术详解DynamIQ 与 big.LITTLEDynamIQ cluster 分类硬件支持 DynamIQ为什么适合人工智能&#xff1f; DynamIQ 技术背景 2017年3月21日下午&#xff0c;ARM在北京金隅喜来登酒店召开发布会&#xff0c;正式发布了全新的有针对人工智能及机器…...

24届近5年南京大学自动化考研院校分析

今天给大家带来的是南京大学控制考研分析 满满干货&#xff5e;还不快快点赞收藏 一、南京大学 学校简介 南京大学是一所历史悠久、声誉卓著的高等学府。其前身是创建于1902年的三江师范学堂&#xff0c;此后历经两江师范学堂、南京高等师范学校、国立东南大学、国立第四中…...

微信小程序(原生)和uniapp预览电子文件doc/pdf/ppt/excel等

微信小程序原生预览文件 function previewFile(value) {const fileExtName ${value.ext};const randFile new Date().getTime() fileExtName;uni.showLoading({title: 加载中...})wx.downloadFile({url: value.url, // 文件的本身urlfilePath: wx.env.USER_DATA_PATH / r…...

【前端 | CSS】align-items与align-content的区别

align-items 描述 CSS align-items 属性将所有直接子节点上的 align-self 值设置为一个组。align-self 属性设置项目在其包含块中在交叉轴方向上的对齐方式 align-items是针对每一个子项起作用&#xff0c;它的基本单位是每一个子项&#xff0c;在所有情况下都有效果&…...

Go语言入门

Go语言入门 简介 Go是一门由Google开发的开源编程语言&#xff0c;旨在提供高效、可靠和简洁的软件开发工具。Go具有静态类型、垃圾回收、并发性和高效编译的特点&#xff0c;适用于构建可扩展的网络服务和系统工具。本文将介绍Go语言的基础知识和常用功能&#xff0c;并通过…...

Python学习笔记第五十五天(Pandas CSV文件)

Python学习笔记第五十五天 Pandas CSV 文件read_csv()to_string()to_csv() 数据处理head()tail()fillna() info() 后记 Pandas CSV 文件 CSV&#xff08;Comma-Separated Values&#xff0c;逗号分隔值&#xff0c;有时也称为字符分隔值&#xff0c;因为分隔字符也可以不是逗号…...

自然语言处理: 第七章GPT的搭建

自然语言处理: 第七章GPT的搭建 理论基础 在以transformer架构为框架的大模型遍地开花后&#xff0c;大模型的方向基本分成了三类分别是: decoder-only架构 , 其中以GPT系列为代表encoder-only架构&#xff0c;其中以BERT系列为代表encoder-decoder架构&#xff0c;标准的tr…...

【奶奶看了都会】2分钟学会制作最近特火的ikun幻术图

1.效果展示 最近ikun幻术图特别火啊&#xff0c;在网上能找到各种各样的ikun姿势图片&#xff0c;这些图片都是AI绘制的&#xff0c;能和风景完美融合在一起&#xff0c;今天小卷就来教大家怎么做这种图片 先看看图片效果 视频链接&#xff1a; 仿佛见到一位故人&#xff0c;…...

【深度学习】【风格迁移】Zero-shot Image-to-Image Translation

论文&#xff1a;https://arxiv.org/abs/2302.03027 代码&#xff1a;https://github.com/pix2pixzero/pix2pix-zero/tree/main 文章目录 Abstract1. Introduction相关工作3. Method Abstract 大规模文本到图像生成模型展示了它们合成多样且高质量图像的显著能力。然而&#x…...

Day 30 C++ STL 常用算法(上)

文章目录 算法概述常用遍历算法for_each——实现遍历容器函数原型示例 transform——搬运容器到另一个容器中函数原型注意示例 常用查找算法find——查找指定元素函数原型示例 find_if—— 查找符合条件的元素函数原型示例 adjacent_find——查找相邻重复元素函数原型示例 bina…...

MES系统在机器人行业生产管理种的运用

机器人的智能水平也伴随技术的迭代不断攀升。 2021年的春晚舞台上&#xff0c;来自全球领先工业机器人企业abb的全球首款双臂协作机器人yumi&#xff0c;轻松自如地表演了一出写“福”字&#xff0c;赢得了全国观众的赞叹。 在汽车装配领域&#xff0c;一台机器人可以自主完成一…...

Spark(39):Streaming DataFrame 和 Streaming DataSet 输出

目录 0. 相关文章链接 1. 输出的选项 2. 输出模式(output mode) 2.1. Append 模式(默认) 2.2. Complete 模式 2.3. Update 模式 2.4. 输出模式总结 3. 输出接收器(output sink) 3.1. file sink 3.2. kafka sink 3.2.1. 以 Streaming 方式输出数据 3.2.2. 以 batch …...

【云原生】Docker 详解(一):从虚拟机到容器

Docker 详解&#xff08;一&#xff09;&#xff1a;从虚拟机到容器 1.虚拟化 要解释清楚 Docker&#xff0c;首先要解释清楚 容器&#xff08;Container&#xff09;的概念。要解释容器的话&#xff0c;就需要从操作系统说起。操作系统太底层&#xff0c;细说的话一两本书都说…...

代码随想录第48天 | 198. 打家劫舍、213. 打家劫舍II、337. 打家劫舍III

198. 打家劫舍 当前房屋偷与不偷取决于 前一个房屋和前两个房屋是否被偷了。 递归五部曲&#xff1a; dp[i]&#xff1a;考虑下标i&#xff08;包括i&#xff09;以内的房屋&#xff0c;最多可以偷窃的金额为dp[i]。决定dp[i]的因素就是第i房间偷还是不偷。 如果偷第i房间&…...

大数据学习栈记——Neo4j的安装与使用

本文介绍图数据库Neofj的安装与使用&#xff0c;操作系统&#xff1a;Ubuntu24.04&#xff0c;Neofj版本&#xff1a;2025.04.0。 Apt安装 Neofj可以进行官网安装&#xff1a;Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...

深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法

深入浅出&#xff1a;JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中&#xff0c;随机数的生成看似简单&#xff0c;却隐藏着许多玄机。无论是生成密码、加密密钥&#xff0c;还是创建安全令牌&#xff0c;随机数的质量直接关系到系统的安全性。Jav…...

selenium学习实战【Python爬虫】

selenium学习实战【Python爬虫】 文章目录 selenium学习实战【Python爬虫】一、声明二、学习目标三、安装依赖3.1 安装selenium库3.2 安装浏览器驱动3.2.1 查看Edge版本3.2.2 驱动安装 四、代码讲解4.1 配置浏览器4.2 加载更多4.3 寻找内容4.4 完整代码 五、报告文件爬取5.1 提…...

Element Plus 表单(el-form)中关于正整数输入的校验规则

目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入&#xff08;联动&#xff09;2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...

Java 二维码

Java 二维码 **技术&#xff1a;**谷歌 ZXing 实现 首先添加依赖 <!-- 二维码依赖 --><dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.5.1</version></dependency><de…...

智能AI电话机器人系统的识别能力现状与发展水平

一、引言 随着人工智能技术的飞速发展&#xff0c;AI电话机器人系统已经从简单的自动应答工具演变为具备复杂交互能力的智能助手。这类系统结合了语音识别、自然语言处理、情感计算和机器学习等多项前沿技术&#xff0c;在客户服务、营销推广、信息查询等领域发挥着越来越重要…...

初探Service服务发现机制

1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能&#xff1a;服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源&#xf…...

JS设计模式(4):观察者模式

JS设计模式(4):观察者模式 一、引入 在开发中&#xff0c;我们经常会遇到这样的场景&#xff1a;一个对象的状态变化需要自动通知其他对象&#xff0c;比如&#xff1a; 电商平台中&#xff0c;商品库存变化时需要通知所有订阅该商品的用户&#xff1b;新闻网站中&#xff0…...

Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档&#xff09;&#xff0c;如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下&#xff0c;风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...

LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》

这段 Python 代码是一个完整的 知识库数据库操作模块&#xff0c;用于对本地知识库系统中的知识库进行增删改查&#xff08;CRUD&#xff09;操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 &#x1f4d8; 一、整体功能概述 该模块…...