当前位置: 首页 > article >正文

聊聊 Pulsar:Producer 源码解析

一、前言

Apache Pulsar 是一个企业级的开源分布式消息传递平台,以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中,Producer(生产者) 是连接客户端应用与消息队列的第一步。生产者负责将消息写入指定的 Topic,从而启动整个消息流转的生命周期。

深入解析 Pulsar Producer 的源码,不仅可以了解消息生产的内部实现机制,还能帮助开发者优化客户端性能,同时为调试和扩展提供基础。在本文中,我们将从 Producer 的创建流程、消息发送逻辑以及其与 Broker 的交互机制出发,逐步剖析源码细节,以期为读者提供全面的技术视角和实践指导。

二、Producer测试类

老规矩,我们先以Producer生产下消息,来跟进Producer的相关源码流程。

@Test
public void testSimpleProducerEvents() throws Exception {final String topicName = "persistent://prop/ns-abc/topic0";// 1. producer connectProducer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();assertNotNull(topicRef);assertEquals(topicRef.getProducers().size(), 1);// 2. producer publish messagesfor (int i = 0; i < 10; i++) {String message = "my-message-" + i;producer.send(message.getBytes());}rolloverPerIntervalStats();assertTrue(topicRef.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);// 3. producer disconnectproducer.close();Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);assertEquals(topicRef.getProducers().size(), 0);
}

从上面的代码可以看出 Pulsar 为用户提供了非常简洁方便的 API,在使用时,只需要如下两步:

  • 创建 Pulsar Producer 实例
  • 调用 send 接口发送数据

三、Pulsar Producer 实例化

3.1 实例化ProducerBuilder
在这里插入图片描述
核心在 org.apache.pulsar.client.impl.PulsarClientImpl#createProducerAsync(org.apache.pulsar.client.impl.conf.ProducerConfigurationData, org.apache.pulsar.client.api.Schema, org.apache.pulsar.client.impl.ProducerInterceptors)

用于异步创建消息生产者(Producer)。下面是对代码的详细分析:

  • 泛型 : 表示生产者的消息类型。
  • CompletableFuture<Producer>: 返回一个 CompletableFuture,表示生产者的异步创建过程。
    参数:
    • ProducerConfigurationData conf: 生产者的配置数据。
    • Schema schema: 消息的模式(Schema)。
    • ProducerInterceptors interceptors: 生产者的拦截器列表。
  • 检查模式是否为 AutoProduceBytesSchema:
    • 如果 schema 是 AutoProduceBytesSchema,并且已经初始化,直接调用 createProducerAsync 方法创建生产者。

    • 如果 schema 是 AutoProduceBytesSchema,但未初始化,先从 Pulsar 服务器获取主题的模式信息:

      • 如果模式信息存在,设置 AutoProduceBytesSchema 的模式。
      • 如果模式信息不存在,设置 AutoProduceBytesSchema 的模式为 Schema.BYTES。
    • 最后调用 createProducerAsync 方法创建生产者。

    • 其他情况: 直接调用 createProducerAsync 方法创建生产者。

3.2 异步创建消息生产者

/*** 异步创建消息生产者* @param topic 主题名称* @param conf 生产者的配置数据* @param schema 消息的模式(Schema)* @param interceptors 生产者的拦截器列表* @return 返回一个 CompletableFuture,表示生产者的异步创建过程。* @param <T> 生产者的消息类型*/
private <T> CompletableFuture<Producer<T>> createProducerAsync(String topic,ProducerConfigurationData conf,Schema<T> schema,ProducerInterceptors interceptors) {CompletableFuture<Producer<T>> producerCreatedFuture = new CompletableFuture<>();// 取主题的分区元数据getPartitionedTopicMetadata(topic).thenAccept(metadata -> {if (log.isDebugEnabled()) {log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions);}// 创建生产ProducerBase<T> producer;if (metadata.partitions > 0) {// 分区主题: 如果 metadata.partitions 大于 0,表示这是一个分区主题,调用 newPartitionedProducerImpl 方法创建分区生产者。producer = newPartitionedProducerImpl(topic, conf, schema, interceptors, producerCreatedFuture,metadata);} else {// 非分区主题: 否则,调用 newProducerImpl 方法创建普通生产者。producer = newProducerImpl(topic, -1, conf, schema, interceptors, producerCreatedFuture);}// 将创建的生产者添加到 producers 集合中。producers.add(producer);}).exceptionally(ex -> {log.warn("[{}] Failed to get partitioned topic metadata: {}", topic, ex.getMessage());producerCreatedFuture.completeExceptionally(ex);return null;});return producerCreatedFuture;
}

3.2.1 获取主题的分区元数据

在这里插入图片描述
3.2.2 创建分区生产者

public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, int numPartitions,CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema, ProducerInterceptors interceptors) {super(client, topic, conf, producerCreatedFuture, schema, interceptors);this.producers = new ConcurrentOpenHashMap<>();this.topicMetadata = new TopicMetadataImpl(numPartitions);this.routerPolicy = getMessageRouter();stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStatsRecorderImpl() : null;// MaxPendingMessagesAcrossPartitions doesn't support partial partition such as SinglePartition correctly// 计算最大待处理消息数: 根据配置计算每个分区的最大待处理消息数。int maxPendingMessages = Math.min(conf.getMaxPendingMessages(),conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);conf.setMaxPendingMessages(maxPendingMessages);// 确定要创建的分区索引列表: 根据配置确定要创建的分区索引列表。final List<Integer> indexList;// 懒启动且共享访问模式: 只创建一个分区的生产者,选择的分区由消息路由策略决定。if (conf.isLazyStartPartitionedProducers() &&conf.getAccessMode() == ProducerAccessMode.Shared) {// try to create producer at least one partitionindexList = Collections.singletonList(routerPolicy.choosePartition(((TypedMessageBuilderImpl<T>) newMessage()).getMessage(), topicMetadata));} else {// try to create producer for all partitionsindexList = IntStream.range(0, topicMetadata.numPartitions()).boxed().collect(Collectors.toList());}// 设置第一个分区索引: 设置第一个分区的索引。firstPartitionIndex = indexList.get(0);// 启动生产者: 启动指定分区索引列表中的生产者。start(indexList);// start track and auto subscribe partition increasement// 自动更新分区: 如果配置了自动更新分区,则创建监听器并启动定时任务定期检查和更新分区。if (conf.isAutoUpdatePartitions()) {topicsPartitionChangedListener = new TopicsPartitionChangedListener();partitionsAutoUpdateTimeout = client.timer().newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);}
}

核心在start方法启动生产者:

在这里插入图片描述
创建指定分区索引的生产者:

// 键为分区索引,值为 ProducerImpl 实例。
private final ConcurrentOpenHashMap<Integer, ProducerImpl<T>> producers;private ProducerImpl<T> createProducer(final int partitionIndex) {return producers.computeIfAbsent(partitionIndex, (idx) -> {String partitionName = TopicName.get(topic).getPartition(idx).toString();// 创建一个新的 ProducerImpl 实例,传入分区名称、分区索引、配置、模式、拦截器和一个 CompletableFuture。return client.newProducerImpl(partitionName, idx,conf, schema, interceptors, new CompletableFuture<>());});
}protected <T> ProducerImpl<T> newProducerImpl(String topic, int partitionIndex,ProducerConfigurationData conf,Schema<T> schema,ProducerInterceptors interceptors,CompletableFuture<Producer<T>> producerCreatedFuture) {return new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, partitionIndex, schema,interceptors);
}

老周对比了创建非分区生产者,创建分区生产者前面明显比创建非分区生产者封装的好,老周当时猜测肯定后面有个公共的方法调到创建非分区生产者里来。下面果然验证了老周的想法:
在这里插入图片描述

3.2.3 创建非分区生产者

请添加图片描述

主要看下grabCnx方法建立与 Broker 的连接

3.2.4 建立与 Broker 的连接

protected void grabCnx() {// 检查客户端连接是否已设置if (CLIENT_CNX_UPDATER.get(this) != null) {log.warn("[{}] [{}] Client cnx already set, ignoring reconnection request", state.topic, state.getHandlerName());return;}// 检查当前状态是否允许重新连接if (!isValidStateForReconnection()) {// Ignore connection closed when we are shutting downlog.info("[{}] [{}] Ignoring reconnection request (state: {})", state.topic, state.getHandlerName(), state.getState());return;}try {state.client.getConnection(state.topic) // 获取与指定主题的连接.thenAccept(cnx -> connection.connectionOpened(cnx)) // 处理连接打开事件.exceptionally(this::handleConnectionError);} catch (Throwable t) {log.warn("[{}] [{}] Exception thrown while getting connection: ", state.topic, state.getHandlerName(), t);reconnectLater(t);}
}

在这里插入图片描述

真正核心创建链接在这里:org.apache.pulsar.client.impl.ConnectionPool#createConnection(java.net.InetSocketAddress, java.net.InetSocketAddress, int)

在这里插入图片描述
其实 Pulsar 的 Producer 与 Broker 创建连接和 Kafka 的如出一辙,TCP都是走的Netty那一套。

3.3 Producer 实例化时序图

在这里插入图片描述

四、调用 send 接口发送数据

主要看 producer.send(message.getBytes());

4.1 newMessage方法
在这里插入图片描述

4.2 value方法
在这里插入图片描述
4.3 send方法
在这里插入图片描述

在这里插入图片描述
发送的核心方法在:org.apache.pulsar.client.impl.ProducerImpl#internalSendWithTxnAsync

@Override
CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {if (txn == null) {return internalSendAsync(message);} else {return ((TransactionImpl) txn).registerProducedTopic(topic).thenCompose(ignored -> internalSendAsync(message));}
}

继续跟到internalSendAsync方法里去:

在这里插入图片描述

继续跟到sendAsync方法里去:

在这里插入图片描述
继续跟到serializeAndSendMessage方法里去:

在这里插入图片描述

最后跟到processOpSendMsg处理操作并发送消息方法里去,走的Netty通信把消息发送到Broker里去。

在这里插入图片描述
这样分析下来,也就比较清晰了。下面老周再给大家看下整体的时序图,从创建 Pulsar Producer 实例到调用 send 接口发送数据的全生命周期。

在这里插入图片描述

相关文章:

聊聊 Pulsar:Producer 源码解析

一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台&#xff0c;以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中&#xff0c;Producer&#xff08;生产者&#xff09; 是连接客户端应用与消息队列的第一步。生产者…...

渗透实战PortSwigger靶场-XSS Lab 14:大多数标签和属性被阻止

<script>标签被拦截 我们需要把全部可用的 tag 和 event 进行暴力破解 XSS cheat sheet&#xff1a; https://portswigger.net/web-security/cross-site-scripting/cheat-sheet 通过爆破发现body可以用 再把全部 events 放进去爆破 这些 event 全部可用 <body onres…...

解锁数据库简洁之道:FastAPI与SQLModel实战指南

在构建现代Web应用程序时&#xff0c;与数据库的交互无疑是核心环节。虽然传统的数据库操作方式&#xff08;如直接编写SQL语句与psycopg2交互&#xff09;赋予了我们精细的控制权&#xff0c;但在面对日益复杂的业务逻辑和快速迭代的需求时&#xff0c;这种方式的开发效率和可…...

UDP(Echoserver)

网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法&#xff1a;netstat [选项] 功能&#xff1a;查看网络状态 常用选项&#xff1a; n 拒绝显示别名&#…...

YSYX学习记录(八)

C语言&#xff0c;练习0&#xff1a; 先创建一个文件夹&#xff0c;我用的是物理机&#xff1a; 安装build-essential 练习1&#xff1a; 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件&#xff0c;随机修改或删除一部分&#xff0c;之后…...

大数据零基础学习day1之环境准备和大数据初步理解

学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 &#xff08;1&#xff09;设置网关 打开VMware虚拟机&#xff0c;点击编辑…...

Go 语言接口详解

Go 语言接口详解 核心概念 接口定义 在 Go 语言中&#xff0c;接口是一种抽象类型&#xff0c;它定义了一组方法的集合&#xff1a; // 定义接口 type Shape interface {Area() float64Perimeter() float64 } 接口实现 Go 接口的实现是隐式的&#xff1a; // 矩形结构体…...

从深圳崛起的“机器之眼”:赴港乐动机器人的万亿赛道赶考路

进入2025年以来&#xff0c;尽管围绕人形机器人、具身智能等机器人赛道的质疑声不断&#xff0c;但全球市场热度依然高涨&#xff0c;入局者持续增加。 以国内市场为例&#xff0c;天眼查专业版数据显示&#xff0c;截至5月底&#xff0c;我国现存在业、存续状态的机器人相关企…...

为什么需要建设工程项目管理?工程项目管理有哪些亮点功能?

在建筑行业&#xff0c;项目管理的重要性不言而喻。随着工程规模的扩大、技术复杂度的提升&#xff0c;传统的管理模式已经难以满足现代工程的需求。过去&#xff0c;许多企业依赖手工记录、口头沟通和分散的信息管理&#xff0c;导致效率低下、成本失控、风险频发。例如&#…...

LeetCode - 394. 字符串解码

题目 394. 字符串解码 - 力扣&#xff08;LeetCode&#xff09; 思路 使用两个栈&#xff1a;一个存储重复次数&#xff0c;一个存储字符串 遍历输入字符串&#xff1a; 数字处理&#xff1a;遇到数字时&#xff0c;累积计算重复次数左括号处理&#xff1a;保存当前状态&a…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)

服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案

问题描述&#xff1a;iview使用table 中type: "index",分页之后 &#xff0c;索引还是从1开始&#xff0c;试过绑定后台返回数据的id, 这种方法可行&#xff0c;就是后台返回数据的每个页面id都不完全是按照从1开始的升序&#xff0c;因此百度了下&#xff0c;找到了…...

前端导出带有合并单元格的列表

// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...

pam_env.so模块配置解析

在PAM&#xff08;Pluggable Authentication Modules&#xff09;配置中&#xff0c; /etc/pam.d/su 文件相关配置含义如下&#xff1a; 配置解析 auth required pam_env.so1. 字段分解 字段值说明模块类型auth认证类模块&#xff0c;负责验证用户身份&am…...

Objective-C常用命名规范总结

【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名&#xff08;Class Name)2.协议名&#xff08;Protocol Name)3.方法名&#xff08;Method Name)4.属性名&#xff08;Property Name&#xff09;5.局部变量/实例变量&#xff08;Local / Instance Variables&…...

【解密LSTM、GRU如何解决传统RNN梯度消失问题】

解密LSTM与GRU&#xff1a;如何让RNN变得更聪明&#xff1f; 在深度学习的世界里&#xff0c;循环神经网络&#xff08;RNN&#xff09;以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而&#xff0c;传统RNN存在的一个严重问题——梯度消失&#…...

2024年赣州旅游投资集团社会招聘笔试真

2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...

Opencv中的addweighted函数

一.addweighted函数作用 addweighted&#xff08;&#xff09;是OpenCV库中用于图像处理的函数&#xff0c;主要功能是将两个输入图像&#xff08;尺寸和类型相同&#xff09;按照指定的权重进行加权叠加&#xff08;图像融合&#xff09;&#xff0c;并添加一个标量值&#x…...

【第二十一章 SDIO接口(SDIO)】

第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...

蓝桥杯 2024 15届国赛 A组 儿童节快乐

P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡&#xff0c;轻快的音乐在耳边持续回荡&#xff0c;小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下&#xff0c;六一来了。 今天是六一儿童节&#xff0c;小蓝老师为了让大家在节…...

【大模型RAG】Docker 一键部署 Milvus 完整攻略

本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装&#xff1b;只需暴露 19530&#xff08;gRPC&#xff09;与 9091&#xff08;HTTP/WebUI&#xff09;两个端口&#xff0c;即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...

高频面试之3Zookeeper

高频面试之3Zookeeper 文章目录 高频面试之3Zookeeper3.1 常用命令3.2 选举机制3.3 Zookeeper符合法则中哪两个&#xff1f;3.4 Zookeeper脑裂3.5 Zookeeper用来干嘛了 3.1 常用命令 ls、get、create、delete、deleteall3.2 选举机制 半数机制&#xff08;过半机制&#xff0…...

1688商品列表API与其他数据源的对接思路

将1688商品列表API与其他数据源对接时&#xff0c;需结合业务场景设计数据流转链路&#xff0c;重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点&#xff1a; 一、核心对接场景与目标 商品数据同步 场景&#xff1a;将1688商品信息…...

Leetcode 3577. Count the Number of Computer Unlocking Permutations

Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接&#xff1a;3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯&#xff0c;要想要能够将所有的电脑解锁&#x…...

Linux简单的操作

ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...

连锁超市冷库节能解决方案:如何实现超市降本增效

在连锁超市冷库运营中&#xff0c;高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术&#xff0c;实现年省电费15%-60%&#xff0c;且不改动原有装备、安装快捷、…...

ESP32读取DHT11温湿度数据

芯片&#xff1a;ESP32 环境&#xff1a;Arduino 一、安装DHT11传感器库 红框的库&#xff0c;别安装错了 二、代码 注意&#xff0c;DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...

CentOS下的分布式内存计算Spark环境部署

一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架&#xff0c;相比 MapReduce 具有以下核心优势&#xff1a; 内存计算&#xff1a;数据可常驻内存&#xff0c;迭代计算性能提升 10-100 倍&#xff08;文档段落&#xff1a;3-79…...

汽车生产虚拟实训中的技能提升与生产优化​

在制造业蓬勃发展的大背景下&#xff0c;虚拟教学实训宛如一颗璀璨的新星&#xff0c;正发挥着不可或缺且日益凸显的关键作用&#xff0c;源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例&#xff0c;汽车生产线上各类…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...