当前位置: 首页 > news >正文

Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

kafka尚硅谷视频:

10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili

     1. producer初始化:加载默认配置,以及配置的参数,开启网络线程

     2. 拦截器拦截

     3. 序列化器进行消息key, value序列化

     4. 进行分区

     5. kafka broker集群 获取metaData

     6. 消息缓存到RecordAccumulator收集器,分配到该分区的DQueue(RecordBatch)

     7. batch.size满了,或者linker.ms到达指定时间,唤醒sender线程, 实例化networkClient

         RecordBatch ==>RequestClient 发送消息体,

      8. 与分区相同broker建立网络连接,发送到对应broker

 1. send()方法参数producerRecord对象:

    关于分区:

      a.指定分区,则发送到该分区    

      b.不指定分区,k值没有传入,使用黏性分区(sticky partition

                 第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法   

      c.不指定分区,传入k值,k值先进行hash获取hashCodeValue, 再与topic下的分区数进行求模取余,进行分区。

      如 k hash = 5 topic目前的分区数2  则 分区为:1

          k  hash =6  topic目前的分区数2  则 分区为:0

2. KafkaProducer 异步, 同步发送api:

    异步发送:

                    producer.send(producerRecord对象);

    同步发送则send()方法后面.get()



kafka 的send方法核心逻辑:

    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {return this.send(record, (Callback)null);}public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// 拦截器集合。多个拦截对象循环遍历ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);return this.doSend(interceptedRecord, callback);}private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {TopicPartition tp = null;// 获取集群信息metadatatry {this.throwIfProducerClosed();long nowMs = this.time.milliseconds();ClusterAndWaitTime clusterAndWaitTime;try {clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs);} catch (KafkaException var22) {if (this.metadata.isClosed()) {throw new KafkaException("Producer closed while send in progress", var22);}throw var22;}nowMs += clusterAndWaitTime.waitedOnMetadataMs;long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);Cluster cluster = clusterAndWaitTime.cluster;// 序列化器 key序列化byte[] serializedKey;try {serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException var21) {throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var21);}// 序列化器 value序列化byte[] serializedValue;try {serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException var20) {throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var20);}// 分区int partition = this.partition(record, serializedKey, serializedValue, cluster);tp = new TopicPartition(record.topic(), partition);this.setReadOnly(record.headers());Header[] headers = record.headers().toArray();int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);this.ensureValidRecordSize(serializedSize);long timestamp = record.timestamp() == null ? nowMs : record.timestamp();if (this.log.isTraceEnabled()) {this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});}Callback interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);// RecordAccumulator.append() 添加数据转 ProducerBatchRecordAccumulator.RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);if (result.abortForNewBatch) {int prevPartition = partition;this.partitioner.onNewBatch(record.topic(), cluster, partition);partition = this.partition(record, serializedKey, serializedValue, cluster);tp = new TopicPartition(record.topic(), partition);if (this.log.isTraceEnabled()) {this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition});}interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);}if (this.transactionManager != null) {this.transactionManager.maybeAddPartition(tp);}// 判断是否满了,满了唤醒sender , sender继承了runnableif (result.batchIsFull || result.newBatchCreated) {this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);this.sender.wakeup();}return result.future;} catch (ApiException var23) {this.log.debug("Exception occurred during message send:", var23);if (tp == null) {tp = ProducerInterceptors.extractTopicPartition(record);}Callback interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);interceptCallback.onCompletion((RecordMetadata)null, var23);this.errors.record();this.interceptors.onSendError(record, tp, var23);return new FutureFailure(var23);} catch (InterruptedException var24) {this.errors.record();this.interceptors.onSendError(record, tp, var24);throw new InterruptException(var24);} catch (KafkaException var25) {this.errors.record();this.interceptors.onSendError(record, tp, var25);throw var25;} catch (Exception var26) {this.interceptors.onSendError(record, tp, var26);throw var26;}}

  Sender类 run()方法:

 public void run() {this.log.debug("Starting Kafka producer I/O thread.");while(this.running) {try {this.runOnce();} catch (Exception var5) {this.log.error("Uncaught error in kafka producer I/O thread: ", var5);}}this.log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");while(!this.forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0 || this.hasPendingTransactionalRequests())) {try {this.runOnce();} catch (Exception var4) {this.log.error("Uncaught error in kafka producer I/O thread: ", var4);}}while(!this.forceClose && this.transactionManager != null && this.transactionManager.hasOngoingTransaction()) {if (!this.transactionManager.isCompleting()) {this.log.info("Aborting incomplete transaction due to shutdown");this.transactionManager.beginAbort();}try {this.runOnce();} catch (Exception var3) {this.log.error("Uncaught error in kafka producer I/O thread: ", var3);}}if (this.forceClose) {if (this.transactionManager != null) {this.log.debug("Aborting incomplete transactional requests due to forced shutdown");this.transactionManager.close();}this.log.debug("Aborting incomplete batches due to forced shutdown");this.accumulator.abortIncompleteBatches();}try {this.client.close();} catch (Exception var2) {this.log.error("Failed to close network client", var2);}this.log.debug("Shutdown of Kafka producer I/O thread has completed.");}void runOnce() {if (this.transactionManager != null) {try {this.transactionManager.maybeResolveSequences();if (this.transactionManager.hasFatalError()) {RuntimeException lastError = this.transactionManager.lastError();if (lastError != null) {this.maybeAbortBatches(lastError);}this.client.poll(this.retryBackoffMs, this.time.milliseconds());return;}this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();if (this.maybeSendAndPollTransactionalRequest()) {return;}} catch (AuthenticationException var5) {this.log.trace("Authentication exception while processing transactional request", var5);this.transactionManager.authenticationFailed(var5);}}long currentTimeMs = this.time.milliseconds();// 发送数据long pollTimeout = this.sendProducerData(currentTimeMs);this.client.poll(pollTimeout, currentTimeMs);}

  sendProducerData() :

      最终转换为ClientRequest对象

         ClientRequest clientRequest = this.client.newClientRequest(nodeId, requestBuilder, now, acks != 0, this.requestTimeoutMs, callback);this.client.send(clientRequest, now);
private long sendProducerData(long now) {Cluster cluster = this.metadata.fetch();RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);Iterator iter;if (!result.unknownLeaderTopics.isEmpty()) {iter = result.unknownLeaderTopics.iterator();while(iter.hasNext()) {String topic = (String)iter.next();this.metadata.add(topic, now);}this.log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics);this.metadata.requestUpdate();}iter = result.readyNodes.iterator();long notReadyTimeout = Long.MAX_VALUE;while(iter.hasNext()) {Node node = (Node)iter.next();if (!this.client.ready(node, now)) {iter.remove();notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));}}Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);this.addToInflightBatches(batches);List expiredBatches;Iterator var11;ProducerBatch expiredBatch;if (this.guaranteeMessageOrder) {Iterator var9 = batches.values().iterator();while(var9.hasNext()) {expiredBatches = (List)var9.next();var11 = expiredBatches.iterator();while(var11.hasNext()) {expiredBatch = (ProducerBatch)var11.next();this.accumulator.mutePartition(expiredBatch.topicPartition);}}}this.accumulator.resetNextBatchExpiryTime();List<ProducerBatch> expiredInflightBatches = this.getExpiredInflightBatches(now);expiredBatches = this.accumulator.expiredBatches(now);expiredBatches.addAll(expiredInflightBatches);if (!expiredBatches.isEmpty()) {this.log.trace("Expired {} batches in accumulator", expiredBatches.size());}var11 = expiredBatches.iterator();while(var11.hasNext()) {expiredBatch = (ProducerBatch)var11.next();String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";this.failBatch(expiredBatch, (RuntimeException)(new TimeoutException(errorMessage)), false);if (this.transactionManager != null && expiredBatch.inRetry()) {this.transactionManager.markSequenceUnresolved(expiredBatch);}}this.sensors.updateProduceRequestMetrics(batches);long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);pollTimeout = Math.max(pollTimeout, 0L);if (!result.readyNodes.isEmpty()) {this.log.trace("Nodes with data ready to send: {}", result.readyNodes);pollTimeout = 0L;}this.sendProduceRequests(batches, now);return pollTimeout;}private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {Iterator var4 = collated.entrySet().iterator();while(var4.hasNext()) {Map.Entry<Integer, List<ProducerBatch>> entry = (Map.Entry)var4.next();this.sendProduceRequest(now, (Integer)entry.getKey(), this.acks, this.requestTimeoutMs, (List)entry.getValue());}}private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {if (!batches.isEmpty()) {Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap(batches.size());byte minUsedMagic = this.apiVersions.maxUsableProduceMagic();Iterator var9 = batches.iterator();while(var9.hasNext()) {ProducerBatch batch = (ProducerBatch)var9.next();if (batch.magic() < minUsedMagic) {minUsedMagic = batch.magic();}}ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();Iterator var16 = batches.iterator();while(var16.hasNext()) {ProducerBatch batch = (ProducerBatch)var16.next();TopicPartition tp = batch.topicPartition;MemoryRecords records = batch.records();if (!records.hasMatchingMagic(minUsedMagic)) {records = (MemoryRecords)batch.records().downConvert(minUsedMagic, 0L, this.time).records();}ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());if (tpData == null) {tpData = (new ProduceRequestData.TopicProduceData()).setName(tp.topic());tpd.add(tpData);}tpData.partitionData().add((new ProduceRequestData.PartitionProduceData()).setIndex(tp.partition()).setRecords(records));recordsByPartition.put(tp, batch);}String transactionalId = null;if (this.transactionManager != null && this.transactionManager.isTransactional()) {transactionalId = this.transactionManager.transactionalId();}ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic, (new ProduceRequestData()).setAcks(acks).setTimeoutMs(timeout).setTransactionalId(transactionalId).setTopicData(tpd));RequestCompletionHandler callback = (response) -> {this.handleProduceResponse(response, recordsByPartition, this.time.milliseconds());};String nodeId = Integer.toString(destination);ClientRequest clientRequest = this.client.newClientRequest(nodeId, requestBuilder, now, acks != 0, this.requestTimeoutMs, callback);// this.client 为KafkaClient接口 实现类:NetworkClient对象this.client.send(clientRequest, now);this.log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);}}

 NetworkClient send()方法:

 public void send(ClientRequest request, long now) {this.doSend(request, false, now);}private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {this.ensureActive();String nodeId = clientRequest.destination();if (!isInternalRequest && !this.canSendRequest(nodeId, now)) {throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");} else {AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();try {NodeApiVersions versionInfo = this.apiVersions.get(nodeId);short version;if (versionInfo == null) {version = builder.latestAllowedVersion();if (this.discoverBrokerVersions && this.log.isTraceEnabled()) {this.log.trace("No version information found when sending {} with correlation id {} to node {}. Assuming version {}.", new Object[]{clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version});}} else {version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(), builder.latestAllowedVersion());}this.doSend(clientRequest, isInternalRequest, now, builder.build(version));} catch (UnsupportedVersionException var9) {this.log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", new Object[]{builder, clientRequest.correlationId(), clientRequest.destination(), var9});ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()), clientRequest.callback(), clientRequest.destination(), now, now, false, var9, (AuthenticationException)null, (AbstractResponse)null);if (!isInternalRequest) {this.abortedSends.add(clientResponse);} else if (clientRequest.apiKey() == ApiKeys.METADATA) {this.metadataUpdater.handleFailedRequest(now, Optional.of(var9));}}}}private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {String destination = clientRequest.destination();RequestHeader header = clientRequest.makeHeader(request.version());if (this.log.isDebugEnabled()) {this.log.debug("Sending {} request with header {} and timeout {} to node {}: {}", new Object[]{clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request});}Send send = request.toSend(header);// clientRequest convert InFlightRequest 对象InFlightRequest inFlightRequest = new InFlightRequest(clientRequest, header, isInternalRequest, request, send, now);this.inFlightRequests.add(inFlightRequest);// nio channel。。。selector 发送消息信息//this.selector is Selectable interface  KafkaChannel is implementthis.selector.send(new NetworkSend(clientRequest.destination(), send));}

总结:直接阅读源码很快就能想明白kafka 生产者发送逻辑,kafka-client.jar。  核心==>

   本文第一张图片

相关文章:

Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

kafka尚硅谷视频&#xff1a; 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​ 1. producer初始化&#xff1a;加载默认配置&#xff0c;以及配置的参数&#xff0c;开启网络线程 2. 拦截器拦截 3. 序列化器进行消息key, value序列化 4. 进行分区 5. kafka broker集群 获取…...

Uniapp笔记(七)uniapp打包

一、项目打包 1、h5打包 登录dcloud账户&#xff0c;在manifest.json的基础配置选项中&#xff0c;点击重新获取uniapp应用标识APPID 在manifest.json的Web配置选项的运行的基础路径中输入./ 在菜单栏的发行栏目&#xff0c;点击网站-PC或手机H5 输入网站标题和网站域名&am…...

软考高级系统架构设计师系列论文七十六:论基于构件的软件开发

软考高级系统架构设计师系列论文七十六:论基于构件的软件开发 一、构件相关知识点二、摘要三、正文四、总结一、构件相关知识点 软考高级系统架构设计师系列之:面向构件的软件设计,构件平台与典型架构...

基于Thinkphp6框架全新UI的AI网址导航系统源码

2023全新UI的AI网址导航系统源码&#xff0c;基于thinkphp6框架开发的 AI 网址导航是一个非常实用的工具&#xff0c;它能够帮助用户方便地浏览和管理自己喜欢的网站。 相比于其他的 AI 网址导航&#xff0c;这个项目使用了更加友好和易用的 ThinkPHP 框架进行搭建&#xff0c;…...

Html 补充

accesskey 设置快捷键 Alt设定的键 <a href"https://blog.csdn.net/lcatake/article/details/131716967?spm1001.2014.3001.5501" target"_blank" accesskey"i">我的博客</a> contenteditable 使文本可编译 默认为false 对输入框无…...

Visual Studio编译出来的程序无法在其它电脑上运行

在其它电脑&#xff08;比如Windows Server 2012&#xff09;上运行Visual Studio编译出来的应用程序&#xff0c;结果报错&#xff1a;“无法启动此程序&#xff0c;因为计算机中丢失VCRUNTIME140.dll。尝试重新安装该程序以解决此问题。” 解决方法&#xff1a; 属性 -> …...

习题练习 C语言(暑期第二弹)

编程能力小提升&#xff01; 前言一、表达式判断二、Assii码的理解应用三、循环跳出判断四、数字在升序数组中出现的次数五、整数转换六、循环语句的应用七、函数调用八、两个数组的交集九、C语言基础十、图片整理十一、数组的引用十二、数组的引用十三、字符个数统计十四、多数…...

树莓派使用Nginx+cpolar内网穿透实现无公网IP访问内网本地站点

文章目录 1. Nginx安装2. 安装cpolar3.配置域名访问Nginx4. 固定域名访问5. 配置静态站点 安装 Nginx&#xff08;发音为“engine-x”&#xff09;可以将您的树莓派变成一个强大的 Web 服务器&#xff0c;可以用于托管网站或 Web 应用程序。相比其他 Web 服务器&#xff0c;Ngi…...

攻防世界-Web_php_unserialize

原题 解题思路 注释说了flag存在f14g.php中&#xff0c;但是在wakeup函数中&#xff0c;会把传入的文件名变成index.php。看wp知道&#xff0c;如果被反序列话的字符串其中对应的对象的属性个数发生变化时&#xff0c;会导致反序列化失败而同时使得__wakeup 失效&#xff08;CV…...

云化背景下的接口测试覆盖率自动化检查

一、问题来源 在云化场景下&#xff0c;API的测试覆盖是一项重要评估与考察指标。除了开发者自测试外&#xff08;UT&#xff09;&#xff0c;还可以利用云化测试平台、流水线等方法进行相关指标的检查与考核。利用这种方法既可以减轻开发者测试工作量&#xff0c;不必在本地做…...

QCC_BES 音频重采样算法实现

+V hezkz17进数字音频系统研究开发交流答疑群(课题组) 这段代码是一个用于将音频数据进行立体声重采样的函数。以下是对代码的解读: 函数接受以下参数: pcm_buf:16位有符号整型的音频缓冲区,存储了输入的音频数据。pcm_len:音频缓冲区的长度。mic1:16位有符号整型的音频…...

如何使用CSS实现一个3D旋转效果?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 3D效果实现⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域…...

联想电脑装系统无法按F9后无法从系统盘启动的解决方案

开机时按F9发现没有加载系统盘. 打开BIOS设置界面&#xff0c;调整设置如下: BOOT MODE: Legacy Support.允许legacy方式boot. BOOT PRIORITY: Legacy First. Legacy方式作为首选的boot方式. USB BOOT: ENABLED. 允许以usb方式boot. Legacy: 这里设置legacy boot的优先级,…...

AMEYA360:大唐恩智浦电池管理芯片DNB1168-新能源汽车BMS系统的选择

DNB1168是一款全球独有的集成&#xff08;EIS&#xff09;交流阻抗谱监测功能的单电池监测芯片。该芯片通过车规级AEC-Q100和汽车行业最高功能安全等级ISO 26262&#xff1a;2018 ASIL-D双重认证。芯片?内部集成多种高精度电池参数监测&#xff0c;支持电压、温度、交流阻抗检…...

【Python进阶学习】【Excel读写】使用openpyxl写入xlsx文件

1、当前文件不存在指定的子文件夹则创建 2、文件存在追加写入 3、文件不存在创建文件并写入表头 # -*- coding: utf-8 -*- import openpyxl as xl import osdef write_excel_file(folder_path):if not os.path.exists(folder_path):os.makedirs(folder_path)result_path os.p…...

Docker(md版)

Docker 一、Docker二、更换apt源三、docker搭建四、停启管理五、配置加速器5.1、方法一5.2、方法二 六、使用docker运行漏洞靶场1、拉取tomcat8镜像2、拉取成功3、开启服务4、查看kali的IP地址5、访问靶场6、关闭漏洞靶场 七、vulapps靶场搭建 一、Docker Docker是一个开源的应…...

如何使用CSS实现一个无限循环滚动的图片轮播效果?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐HTML 结构⭐ CSS 样式⭐ JavaScript 控制⭐ 注意事项&#xff1a;⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff0…...

你使用过WebSocket吗?

什么是WebSocket&#xff1f; WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议&#xff0c;它的出现是为了解决 Web 应用中实时通信的需求。传统的 HTTP 协议是基于请求-响应模式的&#xff0c;即客户端发送请求&#xff0c;服务器响应请求&#xff0c;然后连接关闭。…...

Spark整合hive的时候出错

Spark整合hive的时候 连接Hdfs不从我hive所在的机器上找&#xff0c;而是去连接我的集群里的另外两台机器 但是我的集群没有开 所以下面就一直在retry 猜测&#xff1a; 出现这个错误的原因可能与core-site.xml和hdfs-site.xml有关&#xff0c;因为这里面配置了集群的nameno…...

SocketTools.NET 11.0.2148.1554 Crack

添加新功能以简化使用 URL 建立 TCP 连接的过程。 2023 年 8 月 23 日 - 12:35新版本 特征 添加了“HttpGetTextEx”函数&#xff0c;该函数在返回字符串缓冲区中的文本内容时提供附加选项。添加了对“FileTransfer”.NET 类和 ActiveX 控件中的“GetText”和“PutText”方法的…...

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录

ASP.NET Core 是一个跨平台的开源框架&#xff0c;用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录&#xff0c;以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

Appium+python自动化(十六)- ADB命令

简介 Android 调试桥(adb)是多种用途的工具&#xff0c;该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具&#xff0c;其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利&#xff0c;如安装和调试…...

AtCoder 第409​场初级竞赛 A~E题解

A Conflict 【题目链接】 原题链接&#xff1a;A - Conflict 【考点】 枚举 【题目大意】 找到是否有两人都想要的物品。 【解析】 遍历两端字符串&#xff0c;只有在同时为 o 时输出 Yes 并结束程序&#xff0c;否则输出 No。 【难度】 GESP三级 【代码参考】 #i…...

MMaDA: Multimodal Large Diffusion Language Models

CODE &#xff1a; https://github.com/Gen-Verse/MMaDA Abstract 我们介绍了一种新型的多模态扩散基础模型MMaDA&#xff0c;它被设计用于在文本推理、多模态理解和文本到图像生成等不同领域实现卓越的性能。该方法的特点是三个关键创新:(i) MMaDA采用统一的扩散架构&#xf…...

第25节 Node.js 断言测试

Node.js的assert模块主要用于编写程序的单元测试时使用&#xff0c;通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试&#xff0c;通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...

Keil 中设置 STM32 Flash 和 RAM 地址详解

文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...

【Java_EE】Spring MVC

目录 Spring Web MVC ​编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 ​编辑参数重命名 RequestParam ​编辑​编辑传递集合 RequestParam 传递JSON数据 ​编辑RequestBody ​…...

QT: `long long` 类型转换为 `QString` 2025.6.5

在 Qt 中&#xff0c;将 long long 类型转换为 QString 可以通过以下两种常用方法实现&#xff1a; 方法 1&#xff1a;使用 QString::number() 直接调用 QString 的静态方法 number()&#xff0c;将数值转换为字符串&#xff1a; long long value 1234567890123456789LL; …...