当前位置: 首页 > news >正文

Flink 中kafka broker缩容导致Task一直重启

背景

Flink版本 1.12.2
Kafka 客户端 2.4.1
在公司的Flink平台运行了一个读Kafka计算DAU的流程序,由于公司Kafka的缩容,直接导致了该程序一直在重启,重启了一个小时都还没恢复(具体的所容操作是下掉了四台kafka broker,而当时flink配置了12台kafka broker),当时具体的现场如下:

JobManaer上的日志如下:
2023-10-07 10:02:52.975 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: TableSourceScan(table=[[default_catalog, default_database, ubt_start, watermark=[-(LOCALTIMESTAMP, 1000:INTERVAL SECOND)]]]) (34/64) (e33d9ad0196a71e8eb551c181eb779b5) switched from RUNNING to FAILED on container_e08_1690538387235_2599_01_000010 @ task-xxxx-shanghai.emr.aliyuncs.com (dataPort=xxxx).
org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException: nullat org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.lambda$createAndStartDiscoveryLoop$2(FlinkKafkaConsumerBase.java:913)at java.lang.Thread.run(Thread.java:750)对应的 TaskManager(task-xxxx-shanghai.emr.aliyuncs.com)上的日志如下:2023-10-07 10:02:24.604 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxxx] Connection to node 46129 (sh-bs-b1-303-i14-kafka-129-46.ximalaya.local/192.168.129.46:9092) could not be established. Broker may not be available.2023-10-07 10:02:52.939 WARN  org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(t) (34/64)#0 (e33d9ad0196a71e8eb551c181eb779b5) switched from RUNNING to FAILED.
org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException: nullat org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.lambda$createAndStartDiscoveryLoop$2(FlinkKafkaConsumerBase.java:913)at java.lang.Thread.run(Thread.java:750)2023-10-07 10:04:58.205 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxx] Connection to node -4 (xxxx:909) could not be established. Broker may not be available.
2023-10-07 10:04:58.205 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxx] Bootstrap broker sxxxx:909 (id: -4 rack: null) disconnected
2023-10-07 10:04:58.206 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxxu] Connection to node -5 (xxxx:9092) could not be established. Broker may not be available.
2023-10-07 10:04:58.206 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxxu] Bootstrap broker xxxx:9092 (id: -5 rack: null) disconnected2023-10-07 10:08:15.541 WARN  org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(xxx) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

当时Flink中kafka source的相关配置如下:

scan.topic-partition-discovery.interval  300000
restart-strategy.type fixed-delay
restart-strategy.fixed-delay.attempts 50000000
jobmanager.execution.failover-strategy region

结论以及解决

目前在kafka 消费端有两个参数default.api.timeout.ms(默认60000),request.timeout.ms(默认30000),这两个参数来控制kakfa的客户端从服务端请求超时,也就是说每次请求的超时时间是30s,超时之后可以再重试,如果在60s内请求没有得到任何回应,则会报TimeOutException,具体的见如下分析,
我们在flink kafka connector中通过设置如下参数来解决:

`properties.default.api.timeout.ms` = '600000',
`properties.request.timeout.ms` = '5000',
// max.block.ms是设置kafka producer的超时
`properties.max.block.ms` = '600000',

分析

在Flink中对于Kafka的Connector的DynamicTableSourceFactoryKafkaDynamicTableFactory,这里我们只讨论kafka作为source的情况,
而该类的方法createDynamicTableSource最终会被调用,至于具体的调用链可以参考Apache Hudi初探(四)(与flink的结合)–Flink Sql中hudi的createDynamicTableSource/createDynamicTableSink/是怎么被调用–只不过把Sink改成Source就可以了,所以最终会到KafkaDynamicSource类:

@Overridepublic ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {final DeserializationSchema<RowData> keyDeserialization =createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);final DeserializationSchema<RowData> valueDeserialization =createDeserialization(context, valueDecodingFormat, valueProjection, null);final TypeInformation<RowData> producedTypeInfo =context.createTypeInformation(producedDataType);final FlinkKafkaConsumer<RowData> kafkaConsumer =createKafkaConsumer(keyDeserialization, valueDeserialization, producedTypeInfo);return SourceFunctionProvider.of(kafkaConsumer, false);}

该类的getScanRuntimeProvider方法会被调用,所有kafka相关的操作都可以追溯到FlinkKafkaConsumer类(继承FlinkKafkaConsumerBase)中,对于该类重点的方法如下:

    @Overridepublic final void initializeState(FunctionInitializationContext context) throws Exception {OperatorStateStore stateStore = context.getOperatorStateStore();this.unionOffsetStates =stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,createStateSerializer(getRuntimeContext().getExecutionConfig())));... }@Overridepublic void open(Configuration configuration) throws Exception {// determine the offset commit modethis.offsetCommitMode =OffsetCommitModes.fromConfiguration(getIsAutoCommitEnabled(),enableCommitOnCheckpoints,((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());// create the partition discovererthis.partitionDiscoverer =createPartitionDiscoverer(topicsDescriptor,getRuntimeContext().getIndexOfThisSubtask(),getRuntimeContext().getNumberOfParallelSubtasks());this.partitionDiscoverer.open();subscribedPartitionsToStartOffsets = new HashMap<>();final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();if (restoredState != null) {...} else {// use the partition discoverer to fetch the initial seed partitions,// and set their initial offsets depending on the startup mode.// for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;// for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily// determined// when the partition is actually read.switch (startupMode) {。。。default:for (KafkaTopicPartition seedPartition : allPartitions) {subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());}}if (!subscribedPartitionsToStartOffsets.isEmpty()) {switch (startupMode) {...case GROUP_OFFSETS:LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",getRuntimeContext().getIndexOfThisSubtask(),subscribedPartitionsToStartOffsets.size(),subscribedPartitionsToStartOffsets.keySet());}} else {LOG.info("Consumer subtask {} initially has no partitions to read from.",getRuntimeContext().getIndexOfThisSubtask());}}this.deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));}@Overridepublic void run(SourceContext<T> sourceContext) throws Exception {if (subscribedPartitionsToStartOffsets == null) {throw new Exception("The partitions were not set for the consumer");}// initialize commit metrics and default offset callback methodthis.successfulCommits =this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);this.failedCommits =this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();this.offsetCommitCallback =new KafkaCommitCallback() {@Overridepublic void onSuccess() {successfulCommits.inc();}@Overridepublic void onException(Throwable cause) {LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.",subtaskIndex),cause);failedCommits.inc();}};// mark the subtask as temporarily idle if there are no initial seed partitions;// once this subtask discovers some partitions and starts collecting records, the subtask's// status will automatically be triggered back to be active.if (subscribedPartitionsToStartOffsets.isEmpty()) {sourceContext.markAsTemporarilyIdle();}LOG.info("Consumer subtask {} creating fetcher with offsets {}.",getRuntimeContext().getIndexOfThisSubtask(),subscribedPartitionsToStartOffsets);// from this point forward://   - 'snapshotState' will draw offsets from the fetcher,//     instead of being built from `subscribedPartitionsToStartOffsets`//   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to//     Kafka through the fetcher, if configured to do so)this.kafkaFetcher =createFetcher(sourceContext,subscribedPartitionsToStartOffsets,watermarkStrategy,(StreamingRuntimeContext) getRuntimeContext(),offsetCommitMode,getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),useMetrics);if (!running) {return;}if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {kafkaFetcher.runFetchLoop();} else {runWithPartitionDiscovery();}}@Overridepublic final void snapshotState(FunctionSnapshotContext context) throws Exception {...HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),kafkaTopicPartitionLongEntry.getValue()));}... }}@Overridepublic final void notifyCheckpointComplete(long checkpointId) throws Exception {...fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);...}

主要是initializeStateopen,run,snapshotState,notifyCheckpointComplete这四个方法,下面带着问题逐一介绍一下:
注意:对于initializeStateopen方法的先后顺序,可以参考StreamTask类,其中如下的调用链:

invoke()||\/
beforeInvoke()||\/
operatorChain.initializeStateAndOpenOperators||\/
FlinkKafkaConsumerBase.initializeState||\/
FlinkKafkaConsumerBase.open

就可以知道 initializeState方法的调用是在open之前的

initializeState方法

这里做的事情就是从持久化的State中恢复kafkaTopicOffset信息,我们这里假设是第一次启动

open方法

  • offsetCommitMode
    offsetCommitMode = OffsetCommitModes.fromConfiguration 这里获取设置的kafka offset的提交模式,这里会综合enable.auto.commit的配置(默认是true),enableCommitOnCheckpoints默认是true,checkpointing设置为true(默认是false),综合以上得到的值为OffsetCommitMode.ON_CHECKPOINTS
  • partitionDiscoverer
    这里主要是进行kafka的topic的分区发现,主要路程是 partitionDiscoverer.discoverPartitions,这里的涉及的流程如下:
    AbstractPartitionDiscoverer.discoverPartitions||\/
    AbstractPartitionDiscoverer.getAllPartitionsForTopics ||\/
    KafkaPartitionDiscoverer.kafkaConsumer.partitionsFor||\/
    KafkaConsumer.partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs)) //这里的defaultApiTimeoutMs 来自于*default.api.timeout.ms*||\/
    Fetcher.getTopicMetadata //这里面最后抛出 new TimeoutException("Timeout expired while fetching topic metadata");||\/
    Fetcher.sendMetadataRequest => NetworkClient.leastLoadedNode //这里会根据某种策略选择配置的broker的节点||\/
    client.poll(future, timer) => NetworkClient.poll => selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); // 这里的 *defaultRequestTimeoutMs* 来自配置*request.timeout.ms*
    综上所述,discoverPartitions做的就是根据某种策略选择配置的broker节点,对每个节点进行请求,request.timeout.ms超时后,再根据策略选择broker,直至总的时间达到了配置的default.api.timeout.ms,这里默认default.api.timeout.ms 为60秒,request.timeout.ms为30秒
  • subscribedPartitionsToStartOffsets
    根据startupMode模式,默认是StartupMode.GROUP_OFFSETS(默认从上次消费的offset开始消费),设置开启的kafka offset,这在kafkaFetcher中会用到

run方法

  • 设置一些指标successfulCommits/failedCommits
  • KafkaFetcher
    这里主要是从kafka获取数据以及如果有分区发现则循环进kafka的topic分区发现,这里会根据配置scan.topic-partition-discovery.interval默认配置为0,实际中设置的为300000,即5分钟。该主要的流程为在方法runWithPartitionDiscovery:
      private void runWithPartitionDiscovery() throws Exception {final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();createAndStartDiscoveryLoop(discoveryLoopErrorRef);kafkaFetcher.runFetchLoop();// make sure that the partition discoverer is waked up so that// the discoveryLoopThread exitspartitionDiscoverer.wakeup();joinDiscoveryLoopThread();// rethrow any fetcher errorsfinal Exception discoveryLoopError = discoveryLoopErrorRef.get();if (discoveryLoopError != null) {throw new RuntimeException(discoveryLoopError);}}
    • createAndStartDiscoveryLoop 这个会启动单个线程以while sleep方式实现以scan.topic-partition-discovery.interval为间隔来轮询进行Kafka的分区发现,注意这里会吞没Execption,并不会抛出异常

       private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {discoveryLoopThread =new Thread(...while (running) {...try {discoveredPartitions =partitionDiscoverer.discoverPartitions();} catch (AbstractPartitionDiscoverer.WakeupException| AbstractPartitionDiscoverer.ClosedException e) {break;}if (running && !discoveredPartitions.isEmpty()) {kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);}if (running && discoveryIntervalMillis != 0) {try {Thread.sleep(discoveryIntervalMillis);} catch (InterruptedException iex) {break;}}}} catch (Exception e) {discoveryLoopErrorRef.set(e);} finally {// calling cancel will also let the fetcher loop escape// (if not running, cancel() was already called)if (running) {cancel();}}},"Kafka Partition Discovery for "+ getRuntimeContext().getTaskNameWithSubtasks());discoveryLoopThread.start();
      }
      

      这里的kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);subscribedPartitionStates变量会把发现分区信息保存起来,这在kafkaFetcher.runFetchLoop中会设置已经提交的offset信息,并且会在snapshotState会用到

    • kafkaFetcher.runFetchLoop 这里会从kafka拉取数据,并设置kafka的offset,具体的流程如下:

       runFetchLoop ||\/subscribedPartitionStates 这里会获取*subscribedPartitionStates*变量||\/partitionConsumerRecordsHandler||\/emitRecordsWithTimestamps||\/emitRecordsWithTimestamps||\/partitionState.setOffset(offset);
      

      这里的offset就是从消费的kafka记录中获取的

snapshotState方法

这里会对subscribedPartitionStates中的信息进行处理,主要是加到pendingOffsetsToCommit变量中

  • offsetCommitMode
    这里上面说到是OffsetCommitMode.ON_CHECKPOINTS,如果是ON_CHECKPOINTS,则会从fetcher.snapshotCurrentState获取subscribedPartitionStates
    并加到pendingOffsetsToCommit,并持久化到unionOffsetStates中,这实际的kafka offset commit操作在notifyCheckpointComplete中,

notifyCheckpointComplete方法

获取到要提交的kafka offset信息,并持久化保存kafka中

参考

  • open 和 initailizeState的初始化顺序
  • A single failing Kafka broker may cause jobs to fail indefinitely with TimeoutException: Timeout expired while fetching topic metadata

相关文章:

Flink 中kafka broker缩容导致Task一直重启

背景 Flink版本 1.12.2 Kafka 客户端 2.4.1 在公司的Flink平台运行了一个读Kafka计算DAU的流程序&#xff0c;由于公司Kafka的缩容&#xff0c;直接导致了该程序一直在重启&#xff0c;重启了一个小时都还没恢复&#xff08;具体的所容操作是下掉了四台kafka broker&#xff0…...

纯前端js中使用sheetjs导出excel,并且合并标题

先定义变量----用的是Vue2 &#xff0c;以下在vue的data&#xff1a;{}中定义--------------//空格占位符 headerTopTitle: [患者信息, , , , , , , , , 入出院信息, , , , , , , 病案首页中的出院主要诊断, ,出院其他诊断&#xff08;病案首页中原始信息&#xff09;, , , , ,…...

猫眼 校园招聘_1面

&#xff08;1&#xff09;打包和构建工具 vite 和 webpack 功能 1. 构建原理&#xff1a; Webpack 是一个静态模块打包器&#xff0c;通过对项目中的JavaScript、css、Image 等文件进行分析&#xff0c;生成对应的静态资源&#xff0c;并且通过一些插件和加载器来实现各种功…...

博弈论——博弈信息结构

博弈信息结构 0 引言 在一个博弈构成中&#xff0c;博弈信息结构是不可或缺要素。博弈信息&#xff0c;顾名思义&#xff0c;就是在博弈中&#xff0c;博弈方对于信息的了解。知己知彼&#xff0c;百战不殆。和短兵相接的战争一样&#xff0c;只有充分了解自己的优劣势&#x…...

求二叉树的高度——函数递归的思想

二叉树的高度&#xff1a;左右两个数最高的那个的1 int TreeHight(BTNode* root) {if (root NULL){return 0;}int lefhightTreeHight(root->left);int righthight TreeHight(root->right);return lefhight > righthight ? TreeHight(root->left) 1 : TreeHight…...

ue5蓝图请求接口

安装与使用 1、在虚幻商城搜索 VaRest 插件 2、选择自己项目的对应版本安装 3、查看是否安装成功 4、进入项目后&#xff0c;分别启动VaRest、JSON Blueprint Utilities两个插件&#xff08;勾选后会提示重启项目&#xff09; 5、基本用法&#xff1a;打开关卡蓝图使用&#xf…...

windows server 2012 查看已打了哪些补丁

打开控制面板 点击卸载程序 点击 查看已安装的更新 下图是已安装的补丁...

参加CSP-J第一轮后的感受

本人现在初二。作为一名学了4年多c的人&#xff0c;我一直都挺想考过CSP。于是&#xff0c;去年我就去考了。 当时初一&#xff0c;感觉自己实力不够&#xff0c;就只报了J组的。果不其然&#xff0c;63分&#xff0c;没过。 经过1年的苦练&#xff0c;今年又去考了。 J组78分&…...

rust 智能指针

智能指针 Box Box 的使用场景 由于 Box 是简单的封装&#xff0c;除了将值存储在堆上外&#xff0c;并没有其它性能上的损耗。而性能和功能往往是鱼和熊掌&#xff0c;因此 Box 相比其它智能指针&#xff0c;功能较为单一&#xff0c;可以在以下场景中使用它&#xff1a; 特…...

CentOS 7系统安装配置Zabbix 5.0LTS 步骤

目录 一、查看Zabbix官方教程&#xff08;重点&#xff09; 二、安装 Docker 创建 Mysql 容器 安装 Docker 依赖包 添加 Docker 官方仓库 安装 Docker 引擎 启动 Docker 服务并设置开机自启 验证 Docker 是否成功安装 拉取 MySQL 镜像 查看本地镜像 运行容器 停止和启…...

【学习之路】Multi Agent Reinforcement Learning框架与代码

【学习之路】Multi Agent Reiforcement Learning框架与代码 Introduction 国庆期间&#xff0c;有个客户找我写个代码&#xff0c;是强化学习相关的&#xff0c;但我没学过&#xff0c;心里那是一个慌&#xff0c;不过好在经过详细的调研以及自身的实力&#xff0c;最后还是解…...

android 13.0 SystemUI导航栏添加虚拟按键功能(二)

1.概述 在13.0的系统产品开发中,对于在SystemUI的原生系统中默认只有三键导航,想添加其他虚拟按键就需要先在构建导航栏的相关布局 中分析结构,然后添加相关的图标xml就可以了,然后添加对应的点击事件,就可以了,接下来先分析第二步关于导航栏的相关布局情况 然后实现功能…...

Java8 新特性之Stream(二)-- Stream的中间操作

目录 1.filter(Predicate) 2.map(Function) 3.flatMap(Function) 4.distinct() 5.sorted([Comparator]) 6.limit(n) 7.skip(n) 8.peek(Consumer)...

CA与区块链之数字签名详解

CA与区块链验证本质上都是数字签名&#xff0c;首先&#xff0c;我们看一下什么是数字签名&#xff01; 数字签名 数字签名是公钥密码学中的一种技术&#xff0c;用于验证信息的完整性和发送者的身份。简而言之&#xff0c;数字签名是一种确认信息来源和信息完整性的手段。它通…...

一文解读如何应用 REST 对资源进行访问?

文章目录 一、REST 简介二、涉及注解2.1 RequestMapping2.2 PathVariable2.3 RestController2.4 GetMapping、PostMapping、PutMapping、DeleteMapping补充&#xff1a;PathVariable、RequestBody、RequestParam 区别与应用 三、REST风格案例 一、REST 简介 REST (Representat…...

使用JAVA发送邮件

这里用java代码编写发送邮件我采用jar包&#xff0c;需要先点击这里下载三个jar包&#xff1a;这三个包分别为&#xff1a;additionnal.jar&#xff1b;activation.jar&#xff1b;mail.jar。这三个包缺一不可&#xff0c;如果少添加或未添加均会报下面这个错误&#xff1a; C…...

【JavaEE】_servlet程序的编写方法

目录 1. 创建项目 2. 引入依赖 3. 创建目录结构 3.1 在main目录下创建一个webapp目录 3.2 在webapp目录下创建一个WEB-INF目录 3.3 在WEB-INF目录下创建一个web.xml文件 3.4 在web.xml中进行代码编写 4. 编写代码 4.1 在java目录下创建类 4.2 打印"hello world&…...

美国市场三星手机超苹果 中国第一属华为

报告显示&#xff0c;截至5月份的三个月&#xff0c;iOS系统在美国、澳大利亚以及日本表现不俗。Android系统份额则在英国、德国以及法国实现增长。在中国城市地区&#xff0c;iOS份额同比基本持平&#xff0c;而Android份额则达到80.5%&#xff0c;同比增长1个百分点。 三星在…...

nodejs+vue+elementui医院挂号预约管理系统4n9w0

前端技术&#xff1a;nodejsvueelementui 前端&#xff1a;HTML5,CSS3、JavaScript、VUE 1、 node_modules文件夹(有npn install Express 框架于Node运行环境的Web框架, 开发语言 node.js 框架&#xff1a;Express 前端:Vue.js 数据库&#xff1a;mysql 数据库工具&#xff…...

调试技巧(课件图解)

...

业务系统对接大模型的基础方案:架构设计与关键步骤

业务系统对接大模型&#xff1a;架构设计与关键步骤 在当今数字化转型的浪潮中&#xff0c;大语言模型&#xff08;LLM&#xff09;已成为企业提升业务效率和创新能力的关键技术之一。将大模型集成到业务系统中&#xff0c;不仅可以优化用户体验&#xff0c;还能为业务决策提供…...

.Net框架,除了EF还有很多很多......

文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

遍历 Map 类型集合的方法汇总

1 方法一 先用方法 keySet() 获取集合中的所有键。再通过 gey(key) 方法用对应键获取值 import java.util.HashMap; import java.util.Set;public class Test {public static void main(String[] args) {HashMap hashMap new HashMap();hashMap.put("语文",99);has…...

Nginx server_name 配置说明

Nginx 是一个高性能的反向代理和负载均衡服务器&#xff0c;其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机&#xff08;Virtual Host&#xff09;。 1. 简介 Nginx 使用 server_name 指令来确定…...

2025季度云服务器排行榜

在全球云服务器市场&#xff0c;各厂商的排名和地位并非一成不变&#xff0c;而是由其独特的优势、战略布局和市场适应性共同决定的。以下是根据2025年市场趋势&#xff0c;对主要云服务器厂商在排行榜中占据重要位置的原因和优势进行深度分析&#xff1a; 一、全球“三巨头”…...

Linux离线(zip方式)安装docker

目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1&#xff1a;修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本&#xff1a;CentOS 7 64位 内核版本&#xff1a;3.10.0 相关命令&#xff1a; uname -rcat /etc/os-rele…...

从“安全密码”到测试体系:Gitee Test 赋能关键领域软件质量保障

关键领域软件测试的"安全密码"&#xff1a;Gitee Test如何破解行业痛点 在数字化浪潮席卷全球的今天&#xff0c;软件系统已成为国家关键领域的"神经中枢"。从国防军工到能源电力&#xff0c;从金融交易到交通管控&#xff0c;这些关乎国计民生的关键领域…...

破解路内监管盲区:免布线低位视频桩重塑停车管理新标准

城市路内停车管理常因行道树遮挡、高位设备盲区等问题&#xff0c;导致车牌识别率低、逃费率高&#xff0c;传统模式在复杂路段束手无策。免布线低位视频桩凭借超低视角部署与智能算法&#xff0c;正成为破局关键。该设备安装于车位侧方0.5-0.7米高度&#xff0c;直接规避树枝遮…...

消防一体化安全管控平台:构建消防“一张图”和APP统一管理

在城市的某个角落&#xff0c;一场突如其来的火灾打破了平静。熊熊烈火迅速蔓延&#xff0c;滚滚浓烟弥漫开来&#xff0c;周围群众的生命财产安全受到严重威胁。就在这千钧一发之际&#xff0c;消防救援队伍迅速行动&#xff0c;而豪越科技消防一体化安全管控平台构建的消防“…...

VisualXML全新升级 | 新增数据库编辑功能

VisualXML是一个功能强大的网络总线设计工具&#xff0c;专注于简化汽车电子系统中复杂的网络数据设计操作。它支持多种主流总线网络格式的数据编辑&#xff08;如DBC、LDF、ARXML、HEX等&#xff09;&#xff0c;并能够基于Excel表格的方式生成和转换多种数据库文件。由此&…...