当前位置: 首页 > article >正文

【kafka实战】05 Kafka消费者消费消息过程源码剖析

1. 概述

Kafka消费者(Consumer)是Kafka系统中负责从Kafka集群中拉取消息的客户端组件。消费者消费消息的过程涉及多个步骤,包括消费者组的协调、分区分配、消息拉取、消息处理等。本文将深入剖析Kafka消费者消费消息的源码,并结合相关原理图进行讲解。

以下是一个使用 Java 编写的 KafkaConsumer 的示例。在这个示例中,我们将创建一个简单的 Kafka 消费者,连接到 Kafka 集群,订阅一个主题,并消费该主题中的消息。

1.1 消费者代码使用示例

  • 已经安装并启动了 Kafka 集群。
  • 你已经添加了 Kafka 客户端依赖到你的项目中。如果你使用 Maven,可以在 pom.xml 中添加以下依赖:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>

示例代码

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {private static final String TOPIC_NAME = "test-topic";private static final String BOOTSTRAP_SERVERS = "localhost:9092";private static final String GROUP_ID = "test-group";public static void main(String[] args) {// 配置消费者属性Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);// 自动提交偏移量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 键和值的反序列化器props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 创建 Kafka 消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);try {// 订阅主题consumer.subscribe(Collections.singletonList(TOPIC_NAME));// 持续消费消息while (true) {// 从 Kafka 拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}} catch (Exception e) {e.printStackTrace();} finally {// 关闭消费者consumer.close();}}
}

代码解释

  1. 配置消费者属性

    • BOOTSTRAP_SERVERS_CONFIG:指定 Kafka 集群的地址。
    • GROUP_ID_CONFIG:指定消费者所属的消费组。
    • ENABLE_AUTO_COMMIT_CONFIG:设置是否自动提交偏移量。
    • AUTO_COMMIT_INTERVAL_MS_CONFIG:设置自动提交偏移量的时间间隔。
    • KEY_DESERIALIZER_CLASS_CONFIGVALUE_DESERIALIZER_CLASS_CONFIG:指定键和值的反序列化器。
  2. 创建 Kafka 消费者实例:使用配置好的属性创建 KafkaConsumer 实例。

  3. 订阅主题:使用 subscribe 方法订阅指定的主题。

  4. 持续消费消息:使用 poll 方法从 Kafka 拉取消息,并遍历消费记录,打印消息的偏移量、键和值。

  5. 关闭消费者:在消费完成后,使用 close 方法关闭消费者。

注意事项

  • 确保 Kafka 集群的地址和主题名称正确。
  • 如果需要手动提交偏移量,可以将 ENABLE_AUTO_COMMIT_CONFIG 设置为 false,并使用 commitSync()commitAsync() 方法手动提交偏移量。

2. Kafka消费者消费消息的核心流程

Kafka消费者消费消息的核心流程可以分为以下几个步骤:

  1. 消费者组协调:消费者加入消费者组,并与组协调器(GroupCoordinator)进行协调。
  2. 分区分配:组协调器为消费者分配分区。
  3. 消息拉取:消费者从分配的分区中拉取消息。
  4. 消息处理:消费者处理拉取到的消息。
  5. 提交偏移量:消费者提交已处理消息的偏移量。

下面我们将结合源码详细分析每个步骤。
在这里插入图片描述

3. 源码剖析


关键组件说明

  1. ConsumerCoordinator

    • 负责消费者组的协调和分区分配。
    • 管理消费者的心跳和重平衡。
  2. Fetcher

    • 负责从 Kafka Broker 拉取消息。
    • 管理拉取请求和响应的处理。
  3. SubscriptionState

    • 管理消费者订阅的主题和分区。
    • 记录消费者的消费偏移量。
  4. PartitionAssignor

    • 负责分区分配策略的实现。
  5. OffsetCommitCallback

    • 处理偏移量提交的回调逻辑。

3.1 消费者组协调

消费者在启动时,首先需要加入消费者组,并与组协调器进行协调。组协调器负责管理消费者组的成员和分区分配。

// org.apache.kafka.clients.consumer.KafkaConsumer#subscribe
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {// 1. 订阅主题this.subscriptions.subscribe(new HashSet<>(topics), listener);// 2. 加入消费者组coordinator.subscribe(subscriptions);
}

subscribe方法中,消费者首先订阅指定的主题,然后通过coordinator.subscribe方法加入消费者组。组协调器会为消费者分配一个唯一的memberId,并将其加入到消费者组中。

3.2 分区分配

组协调器在消费者加入消费者组后,会为消费者分配分区。分区分配策略由PartitionAssignor决定,Kafka提供了多种内置的分区分配策略,如RangeAssignorRoundRobinAssignor等。

// org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensurePartitionAssignment
private void ensurePartitionAssignment() {// 1. 获取分区分配结果Map<String, List<TopicPartition>> assignments = partitionAssignor.assign(metadata.fetch(), subscriptions.subscription());// 2. 更新消费者的分区分配subscriptions.assignFromSubscribed(assignments.get(consumerId));
}

ensurePartitionAssignment方法中,组协调器通过partitionAssignor.assign方法为消费者分配分区,并将分配结果更新到消费者的订阅信息中。

3.3 消息拉取

消费者在分配到分区后,会从分配的分区中拉取消息。Kafka消费者采用拉取模式(Pull Model),即消费者主动从Kafka集群中拉取消息。

// org.apache.kafka.clients.consumer.KafkaConsumer#poll
public ConsumerRecords<K, V> poll(Duration timeout) {// 1. 拉取消息Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchRecords(timeout);// 2. 返回拉取到的消息return new ConsumerRecords<>(records);
}

poll方法中,消费者通过fetcher.fetchRecords方法从Kafka集群中拉取消息。fetcher是Kafka消费者中的一个重要组件,负责管理消息的拉取和偏移量的提交。

3.4 消息处理

消费者在拉取到消息后,会对消息进行处理。消息处理的具体逻辑由用户自定义,通常包括消息的反序列化、业务逻辑处理等。

// org.apache.kafka.clients.consumer.KafkaConsumer#poll
public ConsumerRecords<K, V> poll(Duration timeout) {// 1. 拉取消息Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchRecords(timeout);// 2. 处理消息for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {for (ConsumerRecord<K, V> record : entry.getValue()) {// 用户自定义的消息处理逻辑processRecord(record);}}// 3. 返回拉取到的消息return new ConsumerRecords<>(records);
}

poll方法中,消费者通过processRecord方法处理每条消息。processRecord方法的具体实现由用户自定义。

3.5 提交偏移量

消费者在处理完消息后,需要提交已处理消息的偏移量。偏移量的提交可以手动或自动进行,Kafka提供了多种偏移量提交策略,如自动提交、同步提交、异步提交等。

// org.apache.kafka.clients.consumer.KafkaConsumer#commitSync
public void commitSync() {// 1. 提交偏移量coordinator.commitOffsetsSync(subscriptions.allConsumed());
}

commitSync方法中,消费者通过coordinator.commitOffsetsSync方法同步提交偏移量。同步提交会阻塞当前线程,直到偏移量提交成功。

4. 原理图

以下是Kafka消费者消费消息的核心流程示意图:

+-------------------+       +-------------------+       +-------------------+
|                   |       |                   |       |                   |
|  消费者组协调      | ----> |     分区分配       | ----> |     消息拉取       |
|                   |       |                   |       |                   |
+-------------------+       +-------------------+       +-------------------+|v
+-------------------+       +-------------------+       +-------------------+
|                   |       |                   |       |                   |
|     消息处理       | <---- |     提交偏移量     | <---- |     网络传输       |
|                   |       |                   |       |                   |
+-------------------+       +-------------------+       +-------------------+

5. 总结

Kafka消费者消费消息的过程涉及多个步骤,包括消费者组的协调、分区分配、消息拉取、消息处理和偏移量提交。通过源码剖析,我们可以更深入地理解Kafka消费者的工作原理。希望本文能够帮助你更好地理解Kafka消费者的内部机制。

6. 参考

  • Kafka官方文档
  • Kafka源码

相关文章:

【kafka实战】05 Kafka消费者消费消息过程源码剖析

1. 概述 Kafka消费者&#xff08;Consumer&#xff09;是Kafka系统中负责从Kafka集群中拉取消息的客户端组件。消费者消费消息的过程涉及多个步骤&#xff0c;包括消费者组的协调、分区分配、消息拉取、消息处理等。本文将深入剖析Kafka消费者消费消息的源码&#xff0c;并结合…...

[EAI-033] SFT 记忆,RL 泛化,LLM和VLM的消融研究

Paper Card 论文标题&#xff1a;SFT Memorizes, RL Generalizes: A Comparative Study of Foundation Model Post-training 论文作者&#xff1a;Tianzhe Chu, Yuexiang Zhai, Jihan Yang, Shengbang Tong, Saining Xie, Dale Schuurmans, Quoc V. Le, Sergey Levine, Yi Ma 论…...

算法与数据结构(字符串相乘)

题目 思路 这道题我们可以使用竖式乘法&#xff0c;从右往左遍历每个乘数&#xff0c;将其相乘&#xff0c;并且把乘完的数记录在nums数组中&#xff0c;然后再进行进位运算&#xff0c;将同一列的数进行相加&#xff0c;进位。 解题过程 首先求出两个数组的长度&#xff0c;…...

DeepSeek从入门到精通:全面掌握AI大模型的核心能力

文章目录 一、DeepSeek是什么&#xff1f;性能对齐OpenAI-o1正式版 二、Deepseek可以做什么&#xff1f;能力图谱文本生成自然语言理解与分析编程与代码相关常规绘图 三、如何使用DeepSeek&#xff1f;四、DeepSeek从入门到精通推理模型推理大模型非推理大模型 快思慢想&#x…...

【Pytorch函数】PyTorch随机数生成全解析 | torch.rand()家族函数使用指南

&#x1f31f; PyTorch随机数生成全解析 | torch.rand()家族函数使用指南 &#x1f31f; &#x1f4cc; 一、核心函数参数详解 PyTorch提供多种随机数生成函数&#xff08;注意&#xff1a;无直接torch.random()函数&#xff09;&#xff0c;以下是常用函数及参数&#xff1a;…...

vue print 打印

vue 点击打印页面部分内容&#xff0c;或者打印弹窗内的内容 打印页面部分内容 <template><div><div id"print"><div class"info"><div class"bx_title">费用报销单<span class"code">NO.<s…...

【异常解决】在idea中提示 hutool 提示 HttpResponse used withoud try-with-resources statement

博主介绍&#xff1a;✌全网粉丝22W&#xff0c;CSDN博客专家、Java领域优质创作者&#xff0c;掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域✌ 技术范围&#xff1a;SpringBoot、SpringCloud、Vue、SSM、HTML、Nodejs、Python、MySQL、PostgreSQL、大数据、物…...

【Uniapp-Vue3】UniCloud云数据库获取指定字段的数据

使用where方法可以获取指定的字段&#xff1a; let db uniCloud.database(); db.collection("数据表").where({字段名1:数据, 字段名2:数据}).get({getOne:true}) 如果我们不在get中添加{getOne:true}&#xff0c;在只获取到一个数据res.result.data将会是一个数组&…...

信息科技伦理与道德3-2:智能决策

2.2 智能推荐 推荐算法介绍 推荐系统&#xff1a;猜你喜欢 https://blog.csdn.net/search_129_hr/article/details/120468187 推荐系统–矩阵分解 https://blog.csdn.net/search_129_hr/article/details/121598087 案例一&#xff1a;YouTube推荐算法向儿童推荐不适宜视频 …...

openssl使用

openssl使用 提取密钥对 数字证书pfx包含公钥和私钥&#xff0c;而cer证书只包含公钥。提取需输入证书保护密码 openssl pkcs12 -in xxx.pfx -nocerts -nodes -out pare.key提取私钥 openssl rsa -in pare.key -out pri.key提取公钥 openssl rsa -in pare.key -pubout -ou…...

Visual Studio 2022 中使用 Google Test

要在 Visual Studio 2022 中使用 Google Test (gtest)&#xff0c;可以按照以下步骤进行&#xff1a; 安装 Google Test&#xff1a;确保你已经安装了 Google Test。如果没有安装&#xff0c;可以通过 Visual Studio Installer 安装。在安装程序中&#xff0c;找到并选择 Googl…...

SpringBoot3 + Jedis5 + Redis集群 如何通过scan方法分页获取所有keys

背景: 由于需要升级老项目代码&#xff0c;从SpringBoot1.5.x 升级到 SpringBoot3.3.x&#xff0c;框架中引用的Jedis自动升级到了 5.x&#xff1b;正好代码中有需要获取Redis集群的所有keys的需求存在&#xff1b;代码就不适用了&#xff0c;修改如下&#xff1a; POM 由于…...

WGCLOUD监控系统部署教程

官网地址&#xff1a;下载WGCLOUD安装包 - WGCLOUD官网 第一步、环境配置 #安装jdk 1、安装 EPEL 仓库&#xff1a; sudo yum install -y epel-release 2、安装 OpenJDK 11&#xff1a; sudo yum install java-11-openjdk-devel 3、如果成功&#xff0c;你可以通过运行 java …...

协议-WebRTC-HLS

是什么&#xff1f; WebRTC&#xff08;Web Real-Time Communication&#xff09; 实现 Web 浏览器和移动应用程序之间通过互联网直接进行实时通信。允许点对点音频、视频和数据共享&#xff0c;而无需任何插件或其他软件。WebRTC 广泛用于构建视频会议、语音通话、直播、在线游…...

jQuery UI 下载指南

jQuery UI 下载指南 引言 jQuery UI 是一个基于 jQuery 的用户界面和交互库&#xff0c;它提供了一套丰富的交互组件和视觉效果&#xff0c;可以帮助开发者快速构建美观、交互性强的网页应用。本文将为您详细介绍如何下载 jQuery UI&#xff0c;并指导您进行安装和使用。 jQ…...

MySQL系列之数据类型(String)

导览 前言一、字符串类型知多少 1. 类型说明2. 字符和字节的转换 二、字符串类型的异同 1. CHAR & VARCHAR2. BINARY & VARBINARY3. BLOB & TEXT4. ENUM & SET 结语精彩回放 前言 MySQL数据类型第三弹闪亮登场&#xff0c;欢迎关注O。 本篇博主开始谈谈MySQ…...

Kotlin 2.1.0 入门教程(十)if、when

if 表达式 if 是一个表达式&#xff0c;它会返回一个值。 不存在三元运算符&#xff08;condition ? then : else&#xff09;&#xff0c;因为 if 在这种场景下完全可以胜任。 var max aif (a < b) max bif (a > b) {max a } else {max b }max if (a > b) a…...

编程式路由

<script> export default {name: video-Info1,created () {setTimeout(() > {this.$router.push({ name: home })}, 3000)} } </script> 编程式路由&#xff1a;实现 不需要用户点击router-link&#xff0c;由代码实现路由跳转。 应用场景&#xff1a;用户登录…...

openAI官方prompt技巧(一)

1. 使用最新的模型 2. 将指令放在提示词的开头&#xff0c;并使用 ### 或 """ 来分隔指令和上下文&#xff0c;例如 错误示范❌ 将下面的文本总结为一个要点列表&#xff0c;列出最重要的内容。 Summarize the text below as a bullet point list of the most…...

利用 Python 爬虫获取按关键字搜索淘宝商品的完整指南

在电商数据分析和市场研究中&#xff0c;获取商品的详细信息是至关重要的一步。淘宝作为中国最大的电商平台之一&#xff0c;提供了丰富的商品数据。通过 Python 爬虫技术&#xff0c;我们可以高效地获取按关键字搜索的淘宝商品信息。本文将详细介绍如何利用 Python 爬虫技术获…...

LeetCode 0080.删除有序数组中的重复项 II:双指针 - C++/Java5 行版

【LetMeFly】80.删除有序数组中的重复项 II&#xff1a;双指针 - C/Java5 行版 力扣题目链接&#xff1a;https://leetcode.cn/problems/remove-duplicates-from-sorted-array-ii/ 给你一个有序数组 nums &#xff0c;请你 原地 删除重复出现的元素&#xff0c;使得出现次数超…...

【C++高并发服务器WebServer】-15:poll、epoll详解及实现

本文目录 一、poll二、epoll2.1 相对poll和select的优点2.2 epoll的api2.3 epoll的demo实现2.5 epoll的工作模式 一、poll poll是对select的一个改进&#xff0c;我们先来看看select的缺点。 我们来看看poll的实现。 struct pollfd {int fd; /* 委托内核检测的文件描述符 */s…...

MapReduce是什么?

MapReduce 是一种编程模型&#xff0c;最初由 Google 提出&#xff0c;旨在处理大规模数据集。它是分布式计算的一个重要概念&#xff0c;通常用于处理海量数据并进行并行计算。MapReduce的基本思想是将计算任务分解为两个阶段&#xff1a;Map 阶段和 Reduce 阶段。 Map 阶段&a…...

git提交到GitHub问题汇总

1.main->master git默认主分支是maser&#xff0c;如果是按照这个分支名push&#xff0c;GitHub会出现两个branch&#xff0c;与预期不符 解决方案&#xff1a;更改原始主分支名为main git config --global init.defaultBranch main2.git&#xff1a;OpenSSL SSL_read: SS…...

CNN-GRU卷积神经网络门控循环单元多变量多步预测,光伏功率预测(Matlab完整源码和数据)

代码地址&#xff1a;CNN-GRU卷积神经网络门控循环单元多变量多步预测&#xff0c;光伏功率预测&#xff08;Matlab完整源码和数据) CNN-GRU卷积神经网络门控循环单元多变量多步预测&#xff0c;光伏功率预测 一、引言 1.1、研究背景和意义 随着全球能源危机和环境问题的日…...

编译原理面试问答

编译原理面试拷打 1.编译原理的基本概念 编译原理是研究如何将高级程序语言转换为计算机可执行代码的理论与技术&#xff0c;其核心目标是实现高效、正确的代码翻译。 **编译器&#xff1a;**将源代码转化为目标代码&#xff08;机器码、字节码等&#xff09;。一次翻译整个程…...

LIMO:上海交大的工作 “少即是多” LLM 推理

25年2月来自上海交大、SII 和 GAIR 的论文“LIMO: Less is More for Reasoning”。 一个挑战是在大语言模型&#xff08;LLM&#xff09;中的复杂推理。虽然传统观点认为复杂的推理任务需要大量的训练数据&#xff08;通常超过 100,000 个示例&#xff09;&#xff0c;但本文展…...

Java 魔法:精准掌控 PDF 合同模板,指定页码与关键字替换签章日期

朋友们&#xff01;在实际业务场景中&#xff0c;经常会碰到处理 PDF 合同模板的需求&#xff0c;要在几十页的合同里对指定页面替换公章、签名和日期&#xff0c;还涉及多人签名以及多个公司盖公章。下面就给大家分享两种用 Java 处理这类问题的方法&#xff0c;一种是通过指定…...

Ollama 部署本地大语言模型

一、下载安装ollama 1.百度 ollama Ollama 2.点击下载 可以复制下载链接&#xff0c;使用下载器下载。 3.双击安装 默认安装目录&#xff1a;C:\Users\用户名\AppData\Local\Programs\Ollama 二、更改模型下载目录 0.默认下载目录 (跳过) 之前没下载过模型&#xff0c;不…...

Jackson扁平化处理对象

POJO对象 Data public class People {private PeopleInfo peopleInfo;private List<String> peopleIds;private Map<String, String> peopleMap;Datapublic static class PeopleInfo {private String name;private String address;} }JSON序列化处理 直接将对象进…...