当前位置: 首页 > news >正文

消息队列-kafka-消息发送流程(源码跟踪) 与消息可靠性

官方网址

源码:https://kafka.apache.org/downloads
快速开始:https://kafka.apache.org/documentation/#gettingStarted
springcloud整合

发送消息流程

在这里插入图片描述
主线程:主线程只负责组织消息,如果是同步发送会阻塞,如果是异步发送需要传入一个回调函数。
Map集合:存储了主线程的消息。
Sender线程:真正的发送其实是sender去发送到broker中。

源码阅读

1 首先打开Producer.send()可以看到里面的内容

// 返回值是一个 Future 参数为ProducerRecord
Future<RecordMetadata> send(ProducerRecord<K, V> record);
// ProducerRecord定义了这些信息
// 主题
private final String topic;
// 分区
private final Integer partition;
// header
private final Headers headers;
private final K key;
private final V value;
// 时间戳
private final Long timestamp;

2 发送之前的前置处理

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// intercept the record, which can be potentially modified; this method does not throw exceptions// 这里给开发者提供了前置处理的勾子ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);// 我们最终发送的是经过处理后的消息 并且如果是异步发送会有callback 这个是用户定义的return doSend(interceptedRecord, callback);}

3 进入真正的发送逻辑Future doSend()

  • 由于是网络通信,所以我们要序列化,在这个函数里面就做了序列化的内容。
try {serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException cce) {throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +" specified in key.serializer", cce);}byte[] serializedValue;try {serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +" specified in value.serializer", cce);}
  • 然后我们获取分区
// 然后这里又是一个策略者模式 也是由用户可以配置的  DefaultPartitioner UniformStickyPartitioner RoundRobinPartitioner 提供了这样三个分区器
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();return partition != null ?partition :partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

4 到了我们的RecordAccumulator,也就是先由主线程发送到了RecordAccumulator

// 也就是对图中的Map集合
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

我们发现里面是用一个MAP存储的一个分区和ProducerBatch 是讲这个消息写到内存里面MemoryRecordsBuilder 通过这个进行写入

// 可以看到是一个链表实现的双向队列,也就是消息会按append的顺序写到 内存记录中去
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

5 接着我们看,我们append了以后,会有一个判断去唤醒sender线程,见下面的注释

// 如果说哦我们当前的 这个batch满了或者 我们创建了一个新的batch 这个时候唤醒 sender线程去发送数据
if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);// 唤醒sender 去发送数据this.sender.wakeup();}
// 实现了Runnable 所以我们去看一下RUN方法的逻辑
public class Sender implements Runnable 

好上来就是一个循环

while (running) {try {runOnce();} catch (Exception e) {log.error("Uncaught error in kafka producer I/O thread: ", e);}
}

接着进入runOnece方法,直接看核心逻辑

// 从RecordAccumulator 拿数据 然后发送
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);addToInflightBatches(batches);
// 中间省去了非核心逻辑
sendProduceRequests(batches, now);

如果继续跟踪的话最终是走到了selector.send()里面:

Send send = request.toSend(destination, header);InFlightRequest inFlightRequest = new InFlightRequest(clientRequest,header,isInternalRequest,request,send,now);this.inFlightRequests.add(inFlightRequest);selector.send(send);

6 接着我们就要看返回逻辑了,可以看到在sendRequest里面sendProduceRequest方法是通过传入了一个回调函数处理返回的。

RequestCompletionHandler callback = new RequestCompletionHandler() {public void onComplete(ClientResponse response) {handleProduceResponse(response, recordsByPartition, time.milliseconds());}};
// 如果有返回
if (response.hasResponse()) {ProduceResponse produceResponse = (ProduceResponse) response.responseBody();for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {TopicPartition tp = entry.getKey();ProduceResponse.PartitionResponse partResp = entry.getValue();ProducerBatch batch = batches.get(tp);completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());}this.sensors.recordLatency(response.destination(), response.requestLatencyMs());} 

追踪到ProducerBatch

if (this.finalState.compareAndSet(null, tryFinalState)) {completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);return true;}
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {// Set the future before invoking the callbacks as we rely on its state for the `onCompletion` callproduceFuture.set(baseOffset, logAppendTime, exception);// execute callbacksfor (Thunk thunk : thunks) {try {if (exception == null) {RecordMetadata metadata = thunk.future.value();if (thunk.callback != null)thunk.callback.onCompletion(metadata, null);} else {if (thunk.callback != null)thunk.callback.onCompletion(null, exception);}} catch (Exception e) {log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);}}produceFuture.done();}

Thunk 这个其实就是我们在Append的时候的回调:
在这里插入图片描述
至此整个流程就完成了,从发送消息,到响应后回调我们的函数。

消息可靠性

// 所有消费者的配置都在ProducerConfig 里面
public static final String ACKS_CONFIG = "acks";

acks = 0:异步形式,单向发送,不会等待 broker 的响应
acks = 1:主分区保存成功,然后就响应了客户端,并不保证所有的副本分区保存成功
acks = all 或 -1:等待 broker 的响应,然后 broker 等待副本分区的响应,总之数据落地到所有的分区后,才能给到producer 一个响应

在可靠性的保证下,假设一些故障:

  • Broker 收到消息后,同步 ISR 异常:只要在 -1 的情况下,其实不会造成消息的丢失,因为有重试机制
  • Broker 收到消息,并同步 ISR 成功,但是响应超时:只要在 -1 的情况下,其实不会造成消息的丢失,因为有重试机制

可靠性能保证哪些,不能保障哪些?

  • 保证了消息不会丢失
  • 不保证消息一定不会重复(消息有重复的概率,需要消费者有幂等性控制机制)

相关文章:

消息队列-kafka-消息发送流程(源码跟踪) 与消息可靠性

官方网址 源码&#xff1a;https://kafka.apache.org/downloads 快速开始&#xff1a;https://kafka.apache.org/documentation/#gettingStarted springcloud整合 发送消息流程 主线程&#xff1a;主线程只负责组织消息&#xff0c;如果是同步发送会阻塞&#xff0c;如果是异…...

机器学习笔记 计算机视觉中的测距任务常见技术路线

一、计算机视觉中的测距任务 测距是计算机视觉中的一项关键任务,涉及测量物体和相机之间的距离。这些信息可用于多种应用,包括机器人、自动驾驶汽车和增强现实。测距技术有很多种,包括主动式和被动式,每种技术都有自己的优点和局限性。主动测距技术,例如飞行时间、结构光和…...

云计算 3月8号 (wordpress的搭建)

项目wordpress 实验目的&#xff1a; 熟悉yum和编译安装操作 锻炼关联性思维&#xff0c;便于以后做项目 nginx 编译安装 1、安装源码包 [rootlinux-server ~]# yum -y install gcc make zlib-devel pcre pcre-devel openssl-devel [rootlinux-server ~]# wget http://nginx.…...

【CSS】(浮动定位)易忘知识点汇总

浮动特性 加了浮动之后的元素,会具有很多特性,需要我们掌握的. 1、浮动元素会脱离标准流(脱标&#xff1a;浮动的盒子不再保留原先的位置) 2、浮动的元素会一行内显示并且元素顶部对齐 注意&#xff1a; 浮动的元素是互相贴靠在一起的&#xff08;不会有缝隙&#xff09;&…...

Vitual Box虚拟机打开后,键盘鼠标失效

Vitual Box虚拟机打开后&#xff0c;键盘鼠标失效 作者在使用Vitual Box虚拟机软件时&#xff0c;偶然发现打开VitualBox后&#xff0c;鼠标和键盘均无法使用。 你以为是“主机热键”引起的&#xff1f;NO&#xff01; 废话少说 直接上干货&#xff1a; 在VitualBox设置下有…...

宠物空气净化器值得入手吗?选购宠物空气净化器关注哪些方面?

一开始养猫时&#xff0c;每天看着可爱的猫咪在家里快乐奔跑&#xff0c;让人心情愉悦。然而&#xff0c;作为铲屎官都知道&#xff0c;猫咪会掉毛&#xff0c;特别是在换毛期间&#xff0c;地板、沙发上都会有一大堆猫毛&#xff0c;甚至衣服也可能沾满猫毛。养猫家庭中&#…...

前端发起请求,后端模型需处理很久,怎样设置前端直接完成请求响应,后端计算完在返回结果给前端?

在这种情况下&#xff0c;可以采用异步处理的方式来解决。具体步骤如下&#xff1a; 前端发起请求&#xff1a;前端向后端发送请求&#xff0c;但是不等待后端处理完成而是立即得到响应。 后端异步处理&#xff1a;后端接收到请求后&#xff0c;不立即进行处理&#xff0c;而是…...

DDD领域驱动设计

一、什么是领域驱动设计DDD 领域驱动设计&#xff08;Domain-Driven Design&#xff0c;DDD&#xff09;是一种软件开发方法论&#xff0c;它提出了一组关于如何设计和构建软件系统的原则和方法。 二、DDD的诞生是为了解决哪些问题 对复杂业务领域的理解不足&#xff1a;传统…...

网络编程第1天

OSI的七层网络模型有哪些&#xff0c;每一层有什么作用&#xff1f; &#xff08;1&#xff09;应用层 负责处理不同应用程序之间的通信&#xff0c;需要满足提供的协议&#xff0c;确保数据发送方和接收方的正确 应用层提供的协议&#xff1a; HTTP&#xff1a;超文本传输…...

Springboot--整合Logback 日志框架(Maven)

文章目录 前言一、Logback 日志框架介绍&#xff1a;二、整合&#xff1a;2.1 引入jar2.2 logback.xml 文件配置&#xff1a;2.3 日志输出&#xff1a;2.3.1 方式一&#xff1a;2.3.2 方式二&#xff1a; 2.3 日志输出结果展示&#xff1a; 三、扩展&#xff1a;3.1 日志输出格…...

【考研数学】李林《880》vs 李永乐《660》完美使用搭配

没有说谁一定好&#xff0c;只有适不适合自身情况&#xff0c;针对自身弱点选择性价比才最高。 两者侧重点不同&#xff0c;660适合强化前期&#xff0c;弥补基础的不足&#xff0c;880适合强化后期&#xff0c;题型全面&#xff0c;提高我们对综合运用知识的能力。 选择习题…...

Java面试之消息中间件

消息队列 优缺点 特点 解耦异步削峰缺点 系统可用性降低 兜底:代码中try、catch 异常捕捉后直接进行数据库操作,或者 搭建高可用集群,Kafka集群、RocketMQ集群提高复杂度 消息重复(消费端的幂等性设计)、消息丢失(主要集中RabbitMQ)、消息的顺序(业务:1,下单 2,支付 3,发…...

网工学习 DHCP配置-接口模式

网工学习 DHCP配置-接口模式 学习DHCP总是看到&#xff0c;接口模式、全局模式、中继模式。理解起来也不困难&#xff0c;但是自己动手操作起来全是问号。跟着老师视频配置啥问题没有&#xff0c;自己组建网络环境配置就是不通&#xff0c;悲催。今天总结一下我学习接口模式的…...

【GO】语言特点 | Go和Java的对比

while循环 go语言中没有while循环&#xff0c;一般都是用for循环替代 while (条件) {} // Java的for循环for true {} // go 语言中会用一个为真的表达式作为是否 会进入循环的条件&#xff0c;也就是把其他语言的for和while合并了for循环 for (Type item : list) {} // j…...

USB2.0设备检测过程信号分析

1.简介 USB设备接入的Hub端口负责检测USB2.0设备是否存在和确定USB2.0设备的速度。检测设备是否存在和确定设备速度涉及一系列的信号交互&#xff0c;下面将分析该过程。 2.硬件 USB低速设备和全速/高速设备的连接器在硬件结构上有所不同&#xff0c;而主机或者Hub接收端连接…...

Go语言物联网开发安科瑞ADW300/4G电能表数据上传mqtt平台-电表接线到传输数据完整流程

电能表功能说明 ADW300是方便用户进行用电监测、集抄和管理&#xff0c;可灵活安装在配电箱中&#xff0c;可用于电力运维、环保监管等在线监测类平台中。我们本案例是用于工业售电公司对出售电的管理&#xff0c;设备可以监控用电情况、故障监控及警报&#xff0c;售电公司可…...

LabVIEW质谱仪开发与升级

LabVIEW质谱仪开发与升级 随着科技的发展和实验要求的提高&#xff0c;传统基于VB的质谱仪系统已经无法满足当前的高精度和高效率需求。这些系统通常存在着功能不全和操作复杂的问题&#xff0c;影响了科研和生产的进度。为了解决这些问题&#xff0c;开发了一套基于LabVIEW开…...

SwiftUI之DragGesture

SwiftUI中的DragGesture是一种手势识别器&#xff0c;用于捕捉用户拖拽操作。通过DragGesture&#xff0c;我们可以监听用户在视图上的拖拽行为&#xff0c;并对其进行响应。 在SwiftUI中&#xff0c;我们可以将DragGesture应用于任何视图&#xff0c;以便捕捉拖拽手势。在Dra…...

主网NFT的发布合约

1.什么是nft? NFT:Non-fungible-token 非同质化货币 2.新建suimove项目 使用sui move new 项目名命令新建sui move项目 sui move new nft_qyx项目结构如下: 3.写nft合约 module qyx123::nft{use sui::object::{Self, UID};use sui::transfer;use sui::tx_context::{Sel…...

分享2024年在家轻松兼职赚钱的5个副业

今天在网上看到这么一句话&#xff0c;真的让我深有感触&#xff1a;“职场人一定要有居安思危的意识&#xff0c;创业的人一定要三思而后行”。在这个瞬息万变的时代&#xff0c;连被视为铁饭碗的公务员、教师等体制内工作都不能保证一辈子的稳定。发展副业&#xff0c;似乎成…...

AI Agent与Agentic AI:原理、应用、挑战与未来展望

文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例&#xff1a;使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例&#xff1a;使用OpenAI GPT-3进…...

练习(含atoi的模拟实现,自定义类型等练习)

一、结构体大小的计算及位段 &#xff08;结构体大小计算及位段 详解请看&#xff1a;自定义类型&#xff1a;结构体进阶-CSDN博客&#xff09; 1.在32位系统环境&#xff0c;编译选项为4字节对齐&#xff0c;那么sizeof(A)和sizeof(B)是多少&#xff1f; #pragma pack(4)st…...

MongoDB学习和应用(高效的非关系型数据库)

一丶 MongoDB简介 对于社交类软件的功能&#xff0c;我们需要对它的功能特点进行分析&#xff1a; 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具&#xff1a; mysql&#xff1a;关系型数据库&am…...

在rocky linux 9.5上在线安装 docker

前面是指南&#xff0c;后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...

2.Vue编写一个app

1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...

Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级

在互联网的快速发展中&#xff0c;高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司&#xff0c;近期做出了一个重大技术决策&#xff1a;弃用长期使用的 Nginx&#xff0c;转而采用其内部开发…...

ServerTrust 并非唯一

NSURLAuthenticationMethodServerTrust 只是 authenticationMethod 的冰山一角 要理解 NSURLAuthenticationMethodServerTrust, 首先要明白它只是 authenticationMethod 的选项之一, 并非唯一 1 先厘清概念 点说明authenticationMethodURLAuthenticationChallenge.protectionS…...

用docker来安装部署freeswitch记录

今天刚才测试一个callcenter的项目&#xff0c;所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...

代理篇12|深入理解 Vite中的Proxy接口代理配置

在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数 在软件开发中,单例模式(Singleton Pattern)是一种常见的设计模式,确保一个类仅有一个实例,并提供一个全局访问点。在多线程环境下,实现单例模式时需要注意线程安全问题,以防止多个线程同时创建实例,导致…...