当前位置: 首页 > news >正文

Flink 中kafka broker缩容导致Task一直重启

背景

Flink版本 1.12.2
Kafka 客户端 2.4.1
在公司的Flink平台运行了一个读Kafka计算DAU的流程序,由于公司Kafka的缩容,直接导致了该程序一直在重启,重启了一个小时都还没恢复(具体的所容操作是下掉了四台kafka broker,而当时flink配置了12台kafka broker),当时具体的现场如下:

JobManaer上的日志如下:
2023-10-07 10:02:52.975 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: TableSourceScan(table=[[default_catalog, default_database, ubt_start, watermark=[-(LOCALTIMESTAMP, 1000:INTERVAL SECOND)]]]) (34/64) (e33d9ad0196a71e8eb551c181eb779b5) switched from RUNNING to FAILED on container_e08_1690538387235_2599_01_000010 @ task-xxxx-shanghai.emr.aliyuncs.com (dataPort=xxxx).
org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException: nullat org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.lambda$createAndStartDiscoveryLoop$2(FlinkKafkaConsumerBase.java:913)at java.lang.Thread.run(Thread.java:750)对应的 TaskManager(task-xxxx-shanghai.emr.aliyuncs.com)上的日志如下:2023-10-07 10:02:24.604 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxxx] Connection to node 46129 (sh-bs-b1-303-i14-kafka-129-46.ximalaya.local/192.168.129.46:9092) could not be established. Broker may not be available.2023-10-07 10:02:52.939 WARN  org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(t) (34/64)#0 (e33d9ad0196a71e8eb551c181eb779b5) switched from RUNNING to FAILED.
org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException: nullat org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.lambda$createAndStartDiscoveryLoop$2(FlinkKafkaConsumerBase.java:913)at java.lang.Thread.run(Thread.java:750)2023-10-07 10:04:58.205 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxx] Connection to node -4 (xxxx:909) could not be established. Broker may not be available.
2023-10-07 10:04:58.205 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxx] Bootstrap broker sxxxx:909 (id: -4 rack: null) disconnected
2023-10-07 10:04:58.206 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxxu] Connection to node -5 (xxxx:9092) could not be established. Broker may not be available.
2023-10-07 10:04:58.206 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxxu] Bootstrap broker xxxx:9092 (id: -5 rack: null) disconnected2023-10-07 10:08:15.541 WARN  org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(xxx) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

当时Flink中kafka source的相关配置如下:

scan.topic-partition-discovery.interval  300000
restart-strategy.type fixed-delay
restart-strategy.fixed-delay.attempts 50000000
jobmanager.execution.failover-strategy region

结论以及解决

目前在kafka 消费端有两个参数default.api.timeout.ms(默认60000),request.timeout.ms(默认30000),这两个参数来控制kakfa的客户端从服务端请求超时,也就是说每次请求的超时时间是30s,超时之后可以再重试,如果在60s内请求没有得到任何回应,则会报TimeOutException,具体的见如下分析,
我们在flink kafka connector中通过设置如下参数来解决:

`properties.default.api.timeout.ms` = '600000',
`properties.request.timeout.ms` = '5000',
// max.block.ms是设置kafka producer的超时
`properties.max.block.ms` = '600000',

分析

在Flink中对于Kafka的Connector的DynamicTableSourceFactoryKafkaDynamicTableFactory,这里我们只讨论kafka作为source的情况,
而该类的方法createDynamicTableSource最终会被调用,至于具体的调用链可以参考Apache Hudi初探(四)(与flink的结合)–Flink Sql中hudi的createDynamicTableSource/createDynamicTableSink/是怎么被调用–只不过把Sink改成Source就可以了,所以最终会到KafkaDynamicSource类:

@Overridepublic ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {final DeserializationSchema<RowData> keyDeserialization =createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);final DeserializationSchema<RowData> valueDeserialization =createDeserialization(context, valueDecodingFormat, valueProjection, null);final TypeInformation<RowData> producedTypeInfo =context.createTypeInformation(producedDataType);final FlinkKafkaConsumer<RowData> kafkaConsumer =createKafkaConsumer(keyDeserialization, valueDeserialization, producedTypeInfo);return SourceFunctionProvider.of(kafkaConsumer, false);}

该类的getScanRuntimeProvider方法会被调用,所有kafka相关的操作都可以追溯到FlinkKafkaConsumer类(继承FlinkKafkaConsumerBase)中,对于该类重点的方法如下:

    @Overridepublic final void initializeState(FunctionInitializationContext context) throws Exception {OperatorStateStore stateStore = context.getOperatorStateStore();this.unionOffsetStates =stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,createStateSerializer(getRuntimeContext().getExecutionConfig())));... }@Overridepublic void open(Configuration configuration) throws Exception {// determine the offset commit modethis.offsetCommitMode =OffsetCommitModes.fromConfiguration(getIsAutoCommitEnabled(),enableCommitOnCheckpoints,((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());// create the partition discovererthis.partitionDiscoverer =createPartitionDiscoverer(topicsDescriptor,getRuntimeContext().getIndexOfThisSubtask(),getRuntimeContext().getNumberOfParallelSubtasks());this.partitionDiscoverer.open();subscribedPartitionsToStartOffsets = new HashMap<>();final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();if (restoredState != null) {...} else {// use the partition discoverer to fetch the initial seed partitions,// and set their initial offsets depending on the startup mode.// for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;// for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily// determined// when the partition is actually read.switch (startupMode) {。。。default:for (KafkaTopicPartition seedPartition : allPartitions) {subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());}}if (!subscribedPartitionsToStartOffsets.isEmpty()) {switch (startupMode) {...case GROUP_OFFSETS:LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",getRuntimeContext().getIndexOfThisSubtask(),subscribedPartitionsToStartOffsets.size(),subscribedPartitionsToStartOffsets.keySet());}} else {LOG.info("Consumer subtask {} initially has no partitions to read from.",getRuntimeContext().getIndexOfThisSubtask());}}this.deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));}@Overridepublic void run(SourceContext<T> sourceContext) throws Exception {if (subscribedPartitionsToStartOffsets == null) {throw new Exception("The partitions were not set for the consumer");}// initialize commit metrics and default offset callback methodthis.successfulCommits =this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);this.failedCommits =this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();this.offsetCommitCallback =new KafkaCommitCallback() {@Overridepublic void onSuccess() {successfulCommits.inc();}@Overridepublic void onException(Throwable cause) {LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.",subtaskIndex),cause);failedCommits.inc();}};// mark the subtask as temporarily idle if there are no initial seed partitions;// once this subtask discovers some partitions and starts collecting records, the subtask's// status will automatically be triggered back to be active.if (subscribedPartitionsToStartOffsets.isEmpty()) {sourceContext.markAsTemporarilyIdle();}LOG.info("Consumer subtask {} creating fetcher with offsets {}.",getRuntimeContext().getIndexOfThisSubtask(),subscribedPartitionsToStartOffsets);// from this point forward://   - 'snapshotState' will draw offsets from the fetcher,//     instead of being built from `subscribedPartitionsToStartOffsets`//   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to//     Kafka through the fetcher, if configured to do so)this.kafkaFetcher =createFetcher(sourceContext,subscribedPartitionsToStartOffsets,watermarkStrategy,(StreamingRuntimeContext) getRuntimeContext(),offsetCommitMode,getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),useMetrics);if (!running) {return;}if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {kafkaFetcher.runFetchLoop();} else {runWithPartitionDiscovery();}}@Overridepublic final void snapshotState(FunctionSnapshotContext context) throws Exception {...HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),kafkaTopicPartitionLongEntry.getValue()));}... }}@Overridepublic final void notifyCheckpointComplete(long checkpointId) throws Exception {...fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);...}

主要是initializeStateopen,run,snapshotState,notifyCheckpointComplete这四个方法,下面带着问题逐一介绍一下:
注意:对于initializeStateopen方法的先后顺序,可以参考StreamTask类,其中如下的调用链:

invoke()||\/
beforeInvoke()||\/
operatorChain.initializeStateAndOpenOperators||\/
FlinkKafkaConsumerBase.initializeState||\/
FlinkKafkaConsumerBase.open

就可以知道 initializeState方法的调用是在open之前的

initializeState方法

这里做的事情就是从持久化的State中恢复kafkaTopicOffset信息,我们这里假设是第一次启动

open方法

  • offsetCommitMode
    offsetCommitMode = OffsetCommitModes.fromConfiguration 这里获取设置的kafka offset的提交模式,这里会综合enable.auto.commit的配置(默认是true),enableCommitOnCheckpoints默认是true,checkpointing设置为true(默认是false),综合以上得到的值为OffsetCommitMode.ON_CHECKPOINTS
  • partitionDiscoverer
    这里主要是进行kafka的topic的分区发现,主要路程是 partitionDiscoverer.discoverPartitions,这里的涉及的流程如下:
    AbstractPartitionDiscoverer.discoverPartitions||\/
    AbstractPartitionDiscoverer.getAllPartitionsForTopics ||\/
    KafkaPartitionDiscoverer.kafkaConsumer.partitionsFor||\/
    KafkaConsumer.partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs)) //这里的defaultApiTimeoutMs 来自于*default.api.timeout.ms*||\/
    Fetcher.getTopicMetadata //这里面最后抛出 new TimeoutException("Timeout expired while fetching topic metadata");||\/
    Fetcher.sendMetadataRequest => NetworkClient.leastLoadedNode //这里会根据某种策略选择配置的broker的节点||\/
    client.poll(future, timer) => NetworkClient.poll => selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); // 这里的 *defaultRequestTimeoutMs* 来自配置*request.timeout.ms*
    综上所述,discoverPartitions做的就是根据某种策略选择配置的broker节点,对每个节点进行请求,request.timeout.ms超时后,再根据策略选择broker,直至总的时间达到了配置的default.api.timeout.ms,这里默认default.api.timeout.ms 为60秒,request.timeout.ms为30秒
  • subscribedPartitionsToStartOffsets
    根据startupMode模式,默认是StartupMode.GROUP_OFFSETS(默认从上次消费的offset开始消费),设置开启的kafka offset,这在kafkaFetcher中会用到

run方法

  • 设置一些指标successfulCommits/failedCommits
  • KafkaFetcher
    这里主要是从kafka获取数据以及如果有分区发现则循环进kafka的topic分区发现,这里会根据配置scan.topic-partition-discovery.interval默认配置为0,实际中设置的为300000,即5分钟。该主要的流程为在方法runWithPartitionDiscovery:
      private void runWithPartitionDiscovery() throws Exception {final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();createAndStartDiscoveryLoop(discoveryLoopErrorRef);kafkaFetcher.runFetchLoop();// make sure that the partition discoverer is waked up so that// the discoveryLoopThread exitspartitionDiscoverer.wakeup();joinDiscoveryLoopThread();// rethrow any fetcher errorsfinal Exception discoveryLoopError = discoveryLoopErrorRef.get();if (discoveryLoopError != null) {throw new RuntimeException(discoveryLoopError);}}
    • createAndStartDiscoveryLoop 这个会启动单个线程以while sleep方式实现以scan.topic-partition-discovery.interval为间隔来轮询进行Kafka的分区发现,注意这里会吞没Execption,并不会抛出异常

       private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {discoveryLoopThread =new Thread(...while (running) {...try {discoveredPartitions =partitionDiscoverer.discoverPartitions();} catch (AbstractPartitionDiscoverer.WakeupException| AbstractPartitionDiscoverer.ClosedException e) {break;}if (running && !discoveredPartitions.isEmpty()) {kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);}if (running && discoveryIntervalMillis != 0) {try {Thread.sleep(discoveryIntervalMillis);} catch (InterruptedException iex) {break;}}}} catch (Exception e) {discoveryLoopErrorRef.set(e);} finally {// calling cancel will also let the fetcher loop escape// (if not running, cancel() was already called)if (running) {cancel();}}},"Kafka Partition Discovery for "+ getRuntimeContext().getTaskNameWithSubtasks());discoveryLoopThread.start();
      }
      

      这里的kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);subscribedPartitionStates变量会把发现分区信息保存起来,这在kafkaFetcher.runFetchLoop中会设置已经提交的offset信息,并且会在snapshotState会用到

    • kafkaFetcher.runFetchLoop 这里会从kafka拉取数据,并设置kafka的offset,具体的流程如下:

       runFetchLoop ||\/subscribedPartitionStates 这里会获取*subscribedPartitionStates*变量||\/partitionConsumerRecordsHandler||\/emitRecordsWithTimestamps||\/emitRecordsWithTimestamps||\/partitionState.setOffset(offset);
      

      这里的offset就是从消费的kafka记录中获取的

snapshotState方法

这里会对subscribedPartitionStates中的信息进行处理,主要是加到pendingOffsetsToCommit变量中

  • offsetCommitMode
    这里上面说到是OffsetCommitMode.ON_CHECKPOINTS,如果是ON_CHECKPOINTS,则会从fetcher.snapshotCurrentState获取subscribedPartitionStates
    并加到pendingOffsetsToCommit,并持久化到unionOffsetStates中,这实际的kafka offset commit操作在notifyCheckpointComplete中,

notifyCheckpointComplete方法

获取到要提交的kafka offset信息,并持久化保存kafka中

参考

  • open 和 initailizeState的初始化顺序
  • A single failing Kafka broker may cause jobs to fail indefinitely with TimeoutException: Timeout expired while fetching topic metadata

相关文章:

Flink 中kafka broker缩容导致Task一直重启

背景 Flink版本 1.12.2 Kafka 客户端 2.4.1 在公司的Flink平台运行了一个读Kafka计算DAU的流程序&#xff0c;由于公司Kafka的缩容&#xff0c;直接导致了该程序一直在重启&#xff0c;重启了一个小时都还没恢复&#xff08;具体的所容操作是下掉了四台kafka broker&#xff0…...

纯前端js中使用sheetjs导出excel,并且合并标题

先定义变量----用的是Vue2 &#xff0c;以下在vue的data&#xff1a;{}中定义--------------//空格占位符 headerTopTitle: [患者信息, , , , , , , , , 入出院信息, , , , , , , 病案首页中的出院主要诊断, ,出院其他诊断&#xff08;病案首页中原始信息&#xff09;, , , , ,…...

猫眼 校园招聘_1面

&#xff08;1&#xff09;打包和构建工具 vite 和 webpack 功能 1. 构建原理&#xff1a; Webpack 是一个静态模块打包器&#xff0c;通过对项目中的JavaScript、css、Image 等文件进行分析&#xff0c;生成对应的静态资源&#xff0c;并且通过一些插件和加载器来实现各种功…...

博弈论——博弈信息结构

博弈信息结构 0 引言 在一个博弈构成中&#xff0c;博弈信息结构是不可或缺要素。博弈信息&#xff0c;顾名思义&#xff0c;就是在博弈中&#xff0c;博弈方对于信息的了解。知己知彼&#xff0c;百战不殆。和短兵相接的战争一样&#xff0c;只有充分了解自己的优劣势&#x…...

求二叉树的高度——函数递归的思想

二叉树的高度&#xff1a;左右两个数最高的那个的1 int TreeHight(BTNode* root) {if (root NULL){return 0;}int lefhightTreeHight(root->left);int righthight TreeHight(root->right);return lefhight > righthight ? TreeHight(root->left) 1 : TreeHight…...

ue5蓝图请求接口

安装与使用 1、在虚幻商城搜索 VaRest 插件 2、选择自己项目的对应版本安装 3、查看是否安装成功 4、进入项目后&#xff0c;分别启动VaRest、JSON Blueprint Utilities两个插件&#xff08;勾选后会提示重启项目&#xff09; 5、基本用法&#xff1a;打开关卡蓝图使用&#xf…...

windows server 2012 查看已打了哪些补丁

打开控制面板 点击卸载程序 点击 查看已安装的更新 下图是已安装的补丁...

参加CSP-J第一轮后的感受

本人现在初二。作为一名学了4年多c的人&#xff0c;我一直都挺想考过CSP。于是&#xff0c;去年我就去考了。 当时初一&#xff0c;感觉自己实力不够&#xff0c;就只报了J组的。果不其然&#xff0c;63分&#xff0c;没过。 经过1年的苦练&#xff0c;今年又去考了。 J组78分&…...

rust 智能指针

智能指针 Box Box 的使用场景 由于 Box 是简单的封装&#xff0c;除了将值存储在堆上外&#xff0c;并没有其它性能上的损耗。而性能和功能往往是鱼和熊掌&#xff0c;因此 Box 相比其它智能指针&#xff0c;功能较为单一&#xff0c;可以在以下场景中使用它&#xff1a; 特…...

CentOS 7系统安装配置Zabbix 5.0LTS 步骤

目录 一、查看Zabbix官方教程&#xff08;重点&#xff09; 二、安装 Docker 创建 Mysql 容器 安装 Docker 依赖包 添加 Docker 官方仓库 安装 Docker 引擎 启动 Docker 服务并设置开机自启 验证 Docker 是否成功安装 拉取 MySQL 镜像 查看本地镜像 运行容器 停止和启…...

【学习之路】Multi Agent Reinforcement Learning框架与代码

【学习之路】Multi Agent Reiforcement Learning框架与代码 Introduction 国庆期间&#xff0c;有个客户找我写个代码&#xff0c;是强化学习相关的&#xff0c;但我没学过&#xff0c;心里那是一个慌&#xff0c;不过好在经过详细的调研以及自身的实力&#xff0c;最后还是解…...

android 13.0 SystemUI导航栏添加虚拟按键功能(二)

1.概述 在13.0的系统产品开发中,对于在SystemUI的原生系统中默认只有三键导航,想添加其他虚拟按键就需要先在构建导航栏的相关布局 中分析结构,然后添加相关的图标xml就可以了,然后添加对应的点击事件,就可以了,接下来先分析第二步关于导航栏的相关布局情况 然后实现功能…...

Java8 新特性之Stream(二)-- Stream的中间操作

目录 1.filter(Predicate) 2.map(Function) 3.flatMap(Function) 4.distinct() 5.sorted([Comparator]) 6.limit(n) 7.skip(n) 8.peek(Consumer)...

CA与区块链之数字签名详解

CA与区块链验证本质上都是数字签名&#xff0c;首先&#xff0c;我们看一下什么是数字签名&#xff01; 数字签名 数字签名是公钥密码学中的一种技术&#xff0c;用于验证信息的完整性和发送者的身份。简而言之&#xff0c;数字签名是一种确认信息来源和信息完整性的手段。它通…...

一文解读如何应用 REST 对资源进行访问?

文章目录 一、REST 简介二、涉及注解2.1 RequestMapping2.2 PathVariable2.3 RestController2.4 GetMapping、PostMapping、PutMapping、DeleteMapping补充&#xff1a;PathVariable、RequestBody、RequestParam 区别与应用 三、REST风格案例 一、REST 简介 REST (Representat…...

使用JAVA发送邮件

这里用java代码编写发送邮件我采用jar包&#xff0c;需要先点击这里下载三个jar包&#xff1a;这三个包分别为&#xff1a;additionnal.jar&#xff1b;activation.jar&#xff1b;mail.jar。这三个包缺一不可&#xff0c;如果少添加或未添加均会报下面这个错误&#xff1a; C…...

【JavaEE】_servlet程序的编写方法

目录 1. 创建项目 2. 引入依赖 3. 创建目录结构 3.1 在main目录下创建一个webapp目录 3.2 在webapp目录下创建一个WEB-INF目录 3.3 在WEB-INF目录下创建一个web.xml文件 3.4 在web.xml中进行代码编写 4. 编写代码 4.1 在java目录下创建类 4.2 打印"hello world&…...

美国市场三星手机超苹果 中国第一属华为

报告显示&#xff0c;截至5月份的三个月&#xff0c;iOS系统在美国、澳大利亚以及日本表现不俗。Android系统份额则在英国、德国以及法国实现增长。在中国城市地区&#xff0c;iOS份额同比基本持平&#xff0c;而Android份额则达到80.5%&#xff0c;同比增长1个百分点。 三星在…...

nodejs+vue+elementui医院挂号预约管理系统4n9w0

前端技术&#xff1a;nodejsvueelementui 前端&#xff1a;HTML5,CSS3、JavaScript、VUE 1、 node_modules文件夹(有npn install Express 框架于Node运行环境的Web框架, 开发语言 node.js 框架&#xff1a;Express 前端:Vue.js 数据库&#xff1a;mysql 数据库工具&#xff…...

调试技巧(课件图解)

...

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…...

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...

在软件开发中正确使用MySQL日期时间类型的深度解析

在日常软件开发场景中&#xff0c;时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志&#xff0c;到供应链系统的物流节点时间戳&#xff0c;时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库&#xff0c;其日期时间类型的…...

使用VSCode开发Django指南

使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架&#xff0c;专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用&#xff0c;其中包含三个使用通用基本模板的页面。在此…...

label-studio的使用教程(导入本地路径)

文章目录 1. 准备环境2. 脚本启动2.1 Windows2.2 Linux 3. 安装label-studio机器学习后端3.1 pip安装(推荐)3.2 GitHub仓库安装 4. 后端配置4.1 yolo环境4.2 引入后端模型4.3 修改脚本4.4 启动后端 5. 标注工程5.1 创建工程5.2 配置图片路径5.3 配置工程类型标签5.4 配置模型5.…...

51c自动驾驶~合集58

我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留&#xff0c;CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制&#xff08;CCA-Attention&#xff09;&#xff0c;…...

CMake 从 GitHub 下载第三方库并使用

有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...

mac 安装homebrew (nvm 及git)

mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用&#xff1a; 方法一&#xff1a;使用 Homebrew 安装 Git&#xff08;推荐&#xff09; 步骤如下&#xff1a;打开终端&#xff08;Terminal.app&#xff09; 1.安装 Homebrew…...

接口自动化测试:HttpRunner基础

相关文档 HttpRunner V3.x中文文档 HttpRunner 用户指南 使用HttpRunner 3.x实现接口自动化测试 HttpRunner介绍 HttpRunner 是一个开源的 API 测试工具&#xff0c;支持 HTTP(S)/HTTP2/WebSocket/RPC 等网络协议&#xff0c;涵盖接口测试、性能测试、数字体验监测等测试类型…...

【MATLAB代码】基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),附源代码|订阅专栏后可直接查看

文章所述的代码实现了基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),针对传感器观测数据中存在的脉冲型异常噪声问题,通过非线性加权机制提升滤波器的抗干扰能力。代码通过对比传统KF与MCC-KF在含异常值场景下的表现,验证了后者在状态估计鲁棒性方面的显著优…...