当前位置: 首页 > news >正文

Kafka - 3.x Kafka消费者不完全指北

文章目录

  • Kafka消费模式
  • Kakfa消费者工作流程
    • 消费者总体工作流程
    • 消费者组原理
    • 消费者组初始化流程
    • 消费者组详细消费流程
  • 独立消费者案例(订阅主题)
  • 消费者重要参数

在这里插入图片描述


Kafka消费模式

Kafka的consumer采用pull(拉)模式从broker中读取数据。

模式优点缺点
Push(推)模式- 快速传递消息
- 消息发送速率由broker决定
- 难以适应不同消费者的消费速率
- 可能导致拒绝服务和网络拥塞
Pull(拉)模式- 可以根据消费者的消费能力以适当速率消费消息- 潜在的循环问题,如果Kafka没有数据,消费者可能会一直返回空数据
- 需要设置轮询的timeout以避免无限等待时长过长

Kakfa消费者工作流程

消费者总体工作流程

Kafka消费者的总体工作流程包括以下步骤:

  1. 配置消费者属性:首先,你需要配置消费者的属性,包括Kafka集群的地址、消费者组、主题名称、序列化/反序列化器、自动偏移提交等。

  2. 创建消费者实例:使用配置创建Kafka消费者实例。

  3. 订阅主题:使用消费者实例订阅一个或多个Kafka主题。这告诉Kafka消费者你想要从哪些主题中接收消息。

  4. 轮询数据:消费者使用poll()方法从Kafka broker中拉取消息。它会定期轮询(拉)Kafka集群以获取新消息。

  5. 处理消息:一旦从Kafka broker获取到消息,消费者会对消息进行处理,执行你的业务逻辑。这可能包括数据处理、计算、存储或其他操作。

  6. 提交偏移量:消费者可以选择手动或自动提交偏移量,以记录已处理消息的位置。这有助于防止消息重复处理。

  7. 处理异常:处理消息期间可能会出现异常,你需要处理这些异常,例如重试或记录错误日志。

  8. 关闭消费者:在不再需要消费者实例时,确保关闭它以释放资源。

在这里插入图片描述
这个工作流程涵盖了Kafka消费者从配置到数据处理再到资源管理的主要步骤。消费者通常是多线程或多进程的,以处理大量的消息,并能够根据需要调整消费速率。此外,Kafka的消费者库提供了很多功能,如自动负载均衡、自动偏移管理等,以简化消费者的开发和维护。


消费者组原理

Kafka消费者组(Consumer Group)是一种机制,用于协调和管理多个消费者并共同消费一个或多个Kafka主题的消息。消费者组的工作原理如下:

  1. 多个消费者:一个消费者组可以包含多个消费者实例,这些消费者实例协同工作以共同消费一个或多个主题的消息。

  2. 订阅主题:所有消费者实例都订阅相同的Kafka主题。这意味着每个消息都会被消费者组中的一个实例处理,从而实现消息的负载均衡。

  3. 消息分区:每个Kafka主题通常被分为多个分区,每个分区包含消息的一个子集。每个消费者实例负责消费一个或多个分区的消息。

  4. 协调者:消费者组中的消费者实例会选择一个协调者(Coordinator)来管理组内的消费者。协调者通常是ZooKeeper或Kafka自身的一个特殊主题。

  5. 偏移管理:协调者负责管理消费者组的偏移量(offset),这是消费者在主题分区中的当前位置。它会跟踪每个分区的消费进度,确保不会重复消费消息。

  6. 分配分区:协调者会定期重新分配分区给消费者实例,以确保负载均衡和故障恢复。如果有新消费者加入组或有消费者离开组,协调者会重新分配分区。

  7. 消费消息:每个消费者实例负责处理分配给它的分区中的消息。它会拉取消息,进行处理,并将偏移量提交给协调者。

  8. 自动重平衡:如果消费者实例加入或退出消费者组,或者分区的分配发生变化,消费者组会自动进行重新平衡,以确保消息均匀分配。

  9. 提交偏移量:消费者实例可以定期或根据需要提交已处理消息的偏移量,以便在故障时恢复消费进度。

在这里插入图片描述

在这里插入图片描述

通过这种方式,Kafka消费者组能够实现高可用性、负载均衡和容错,允许多个消费者并行处理消息,并根据需求动态调整分区分配。这使得消费者组成为了处理大规模流式数据的理想工具。


消费者组初始化流程

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


消费者组详细消费流程

Kafka消费者组的初始化流程包括一系列步骤,用于创建和配置消费者组的成员。以下是Kafka消费者组的初始化流程:

  1. 引入Kafka客户端库:首先,确保你的应用程序中引入了Kafka客户端库,以便能够使用Kafka相关的类和功能。

  2. 创建消费者配置:初始化消费者组前,需要创建一个消费者配置对象,其中包括了一些重要的属性,例如Kafka集群的地址、消费者组的ID、自动提交偏移量等。

  3. 创建消费者实例:使用消费者配置,创建一个或多个消费者实例。每个实例代表一个消费者组中的一个成员。实例会自动注册到Kafka broker,并与协调者建立连接。

  4. 订阅主题:通过消费者实例,使用subscribe()方法订阅一个或多个Kafka主题。这告诉Kafka你希望从哪些主题中接收消息。

  5. 启动消费者:调用poll()方法开始轮询消息。这将启动消费者实例并开始拉取消息。消费者组中的每个成员都会独立执行这个步骤。

  6. 消费消息:一旦消息被拉取,消费者实例会处理这些消息,执行你的业务逻辑。每个成员在自己的线程中处理消息。

  7. 提交偏移量:消费者实例可以选择手动或自动提交已处理消息的偏移量。这有助于记录每个分区中消息的处理进度。

  8. 处理异常:处理消息期间可能会出现异常,你需要适当地处理这些异常,例如重试消息或记录错误日志。

  9. 关闭消费者:当不再需要消费者实例时,确保关闭它以释放资源。

  10. 自动重平衡:如果有消费者实例加入或离开消费者组,或者分区的分配发生变化,Kafka会自动进行重新平衡,以确保消息均匀分配。

在这里插入图片描述

这个初始化流程涵盖了Kafka消费者组的基本步骤,从配置消费者组成员到消息的处理和消费。请注意,Kafka消费者组的初始化需要注意各个配置选项以及消费者组的协调过程,以确保正常运行和负载均衡。


独立消费者案例(订阅主题)

需求:创建一个独立消费者,消费artisan主题中的数据

注意:在消费者API代码中必须配置消费者组id。

package com.artisan.pc;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomConsumer {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties = new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "artisan-group");// 3. 创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);// 4. 订阅主题ArrayList<String> topics = new ArrayList<>();topics.add("artisan");consumer.subscribe(topics);// 5. 拉取数据打印while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 6. 遍历并输出消费到的数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

① 在IDEA中执行消费者程序
② 服务器上中创建kafka生产者,并输入数据

在这里插入图片描述

③ 在IDEA中观察接收到的数据

ConsumerRecord(topic = artisan, partition = 2, leaderEpoch = 0, offset = 34, CreateTime = 1698630425187, serialized key size = -1, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = first message)
ConsumerRecord(topic = artisan, partition = 2, leaderEpoch = 0, offset = 35, CreateTime = 1698630429909, serialized key size = -1, serialized value size = 15, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = seconde message)

消费者重要参数

参数名称描述
bootstrap.servers向Kafka集群建立初始连接用到的host/port列表。
key.deserializer指定接收消息的key的反序列化类型。需要写全类名。
value.deserializer指定接收消息的value的反序列化类型。需要写全类名。
group.id标记消费者所属的消费者组。
enable.auto.commit默认值为true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms若enable.auto.commit=true,表示消费者提交偏移量的频率,默认为5秒。
auto.offset.reset当Kafka中没有初始偏移量或当前偏移量在服务器中不存在时的处理方式。可选值包括"earliest"、“latest”、“none”、
offsets.topic.num.partitions__consumer_offsets的分区数,默认是50个分区。
heartbeat.interval.msKafka消费者和coordinator之间的心跳时间,默认为3秒。必须小于session.timeout.ms,也不应该高于session.timeout.ms的1/3。
session.timeout.msKafka消费者和coordinator之间连接超时时间,默认为45秒。超过该值,消费者被移除,消费者组执行再平衡。
max.poll.interval.ms消费者处理消息的最大时长,默认为5分钟。超过该值,消费者被移除,消费者组执行再平衡。
fetch.min.bytes消费者获取服务器端一批消息最小的字节数,默认为1个字节。
fetch.max.wait.ms默认为500毫秒。如果没有从服务器端获取到一批数据的最小字节数,等待时间到,仍然会返回数据。
fetch.max.bytes默认为52428800(50兆字节)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值,仍然可以拉取回这批数据,这不是一个绝对最大值,一批次的大小受message.max.bytes(broker配置)或max.message.bytes(topic配置)影响。
max.poll.records一次poll拉取数据返回消息的最大条数,默认为500条。

在这里插入图片描述

相关文章:

Kafka - 3.x Kafka消费者不完全指北

文章目录 Kafka消费模式Kakfa消费者工作流程消费者总体工作流程消费者组原理消费者组初始化流程消费者组详细消费流程 独立消费者案例&#xff08;订阅主题&#xff09;消费者重要参数 Kafka消费模式 Kafka的consumer采用pull&#xff08;拉&#xff09;模式从broker中读取数据…...

Gerrit | 重磅! 2.x 版本升级到 3.x 版本----转

Gerrit | 重磅! 2.x 版本升级到 3.x 版本 为什么要做版本升级&#xff1f; 2.x known bugs 重大问题不一一列举&#xff0c;这里仅仅是举几个例子&#xff1a; 安全或权限问题&#xff1a;普通用户能看到敏感数据&#xff0c;例如看到其他用户的 hashed api 密码&#xff0c…...

使用c++编程语言,用递归的方法求第n个斐波那契数,代码如下

#include<iostream> using namespace std;int fib_1(int n) {if (n < 1){return n;}return fib_1(n - 1) fib_1(n - 2); }int main() {cout << fib_1(6);return 0; }...

git config pull.rebase false

git pull 默认使用merge 可以使用 git pull --rebase 命令使用rebase 或者配置 git config pull.rebase true 使 git pull命令执行 git pull --rebase git config pull.rebase false 的作用是设置 Git 在执行 git pull 命令时默认使用 merge 而不是 rebase。 git pull 命…...

Spring面试题:(一)IoC,DI,AOP和BeanFactory,ApplicationContext

IoC&#xff0c;DI&#xff0c;AOP思想 IOC就是控制反转&#xff0c;是指创建对象的控制权的转移。以前创建对象的主动权和时机是由自己把控的&#xff0c;而现在这种权力转移到Spring容器中&#xff0c;并由容器根据配置文件去创建实例和管理各个实例之间的依赖关系。对象与对…...

RabbitMQ如何保证消息不丢失呢?

RabbitMQ 是一个流行的消息队列系统&#xff0c;用于在分布式应用程序之间传递消息。要确保消息不会丢失&#xff0c;可以采取以下一些措施&#xff1a; 持久化消息&#xff1a; RabbitMQ 允许你将消息标记为持久化的。这意味着消息将被写入磁盘&#xff0c;即使 RabbitMQ 服务…...

VR步进式漫游,轻松构建三维模型,带来展示新形式!

引言&#xff1a; 虚拟现实&#xff08;Virtual Reality&#xff0c;简称VR&#xff09;已经成为当今科技领域的一项创新力量&#xff0c;它正在逐渐渗透到不同的领域&#xff0c;其中步进式漫游是VR技术的一项重要应用&#xff0c;它能在各个行业的宣传中发挥重要作用。 一&a…...

英语——分享篇——常用人物身份

常用人物身份 家庭成员类 father 父亲 mother 母亲 grandmother&#xff08;外&#xff09;祖母 grandfather&#xff08;外&#xff09;祖父 son 儿子 daughter 女儿 uncle 叔叔&#xff0c;舅舅 aunt 婶母&#xff0c;舅母 brother 兄弟 sister 姐妹 nephew 侄子 niece…...

202310-宏基组学物种分析工具-MetaPhlAn4安装和使用方法-Anaconda3- centos9 stream

MetaPhlAn 4是一种基于DNA序列的微生物组分析工具&#xff0c;它能够从宏基因组测序数据中识别和分离微生物的组成。以下是安装和使用MetaPhlAn 4的步骤&#xff1a; 安装MetaPhlAn 4&#xff1a; 裸机环境&#xff0c;手动安装 1. 安装依赖项&#xff1a; MetaPhlAn 4需要…...

systrace/perfetto如何看surfaceflinger的vsync信号方法-android framework实战车载手机系统开发

背景&#xff1a; hi&#xff0c;粉丝朋友们&#xff1a; 大家好&#xff01;近期分享了surfaceflinger相关的一些blog&#xff0c;有同学就对相关的一些内容产生了一些疑问。 比如&#xff1a;vsync查看问题&#xff0c;即怎么才可以说是vsync到来了。 比如perfetto中surfac…...

一文带你彻底弄懂js事件循环(Event Loop)

JavaScript事件循环是JavaScript运行时环境中处理异步操作的机制。它允许JavaScript在执行同步代码的同时处理异步任务&#xff0c;以避免阻塞线程并提供更好的用户体验。 本文将在浏览器异步执行原理基础上带你彻底弄懂js的事件循环机制。 浏览器JS异步执行原理 js是单线程…...

数据结构与算法:二叉树之“堆排序”

目录 一、树概念及结构 二、二叉树树概念及结构 特殊的二叉树 三、堆的概念及结构 四、堆的创建 1、声明结构体 2、初始化 3、销毁 4、添加新元素 5、交换元素 6、向上调整 7、判断堆是否为空 8、移除堆顶元素 9、向下调整 10、获取堆元素个数 五、使用堆排序…...

gma 2 教程(三)坐标参考系统:2.基准面/椭球体

安装 gma&#xff1a;pip install gma 地球是一个近似于椭球体的三维物体&#xff0c;而地球上的各种测量和计算都需要一个基准面来进行。基准面是一个虚拟的平面&#xff0c;用于测量和计算地球上的各种物理量。在地球科学中&#xff0c;基准面通常是一个参考椭球体&#xff0…...

【1day】复现广联达-Linkworks 协同办公管理平台信息泄露漏洞

注:该文章来自作者日常学习笔记,请勿利用文章内的相关技术从事非法测试,如因此产生的一切不良后果与作者无关。 目录 一、漏洞描述 二、影响版本 三、资产测绘 四、漏洞复现...

Spring Cloud之ElasticSearch的学习【详细】

目录 ElasticSearch 正向索引与倒排索引 数据库与elasticsearch概念对比 安装ES、Kibana与分词器 分词器作用 自定义字典 拓展词库 禁用词库 索引库操作 Mapping属性 创建索引库 查询索引库 删除索引库 修改索引库 文档操作 新增文档 查找文档 修改文档 全量…...

vscode免密码认证ssh连接virtual box虚拟机

文章目录 安装软件virtual box配置vscode配置创建并传递密钥连接虚拟机最后 安装软件 安装vscode和virtual box&#xff0c;直接官网下载对应软件包&#xff0c;下载之后&#xff0c;点击执行&#xff0c;最后傻瓜式下一步安装即可 virtual box配置 创建一个仅主机网络的网卡 …...

【Linux】Centos yum源替换

YUM是基于RPM包管理&#xff0c;能够从指定的服务器自动下载RPM包并且安装&#xff0c;可以自动处理依赖性关系&#xff0c;并且一次安装所有依赖的软件包&#xff0c;无须繁琐地一次次下载、安装。 CentOS 8操作系统版本结束了生命周期&#xff08;EOL&#xff09;&#xff0…...

uniapp组件初始化的销毁(监听隐藏事件)

onHide是监听隐藏事件onHide() {console.log("销毁");this.clearTimer(); }, onShow(){console.log("初始化");this.getOrderInfo() },...

leetcode:1207. 独一无二的出现次数(python3解法)

难度&#xff1a;简单 给你一个整数数组 arr&#xff0c;请你帮忙统计数组中每个数的出现次数。 如果每个数的出现次数都是独一无二的&#xff0c;就返回 true&#xff1b;否则返回 false。 示例 1&#xff1a; 输入&#xff1a;arr [1,2,2,1,1,3] 输出&#xff1a;true 解释&…...

2023秋《论文写作》课程总结

2023秋《论文写作》课程总结 授课教师为闵帆教授&#xff0c;原文链接《论文写作》 文章目录 2023秋《论文写作》课程总结一、关于写作工具二、关于写作中的单词、短语、语法等三、关于论文题目四、关于摘要和关键词五、关于引言部分六、关于方法及实验部分七、关于结论八、关…...

Python爬虫实战:研究feedparser库相关技术

1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...

《通信之道——从微积分到 5G》读书总结

第1章 绪 论 1.1 这是一本什么样的书 通信技术&#xff0c;说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号&#xff08;调制&#xff09; 把信息从信号中抽取出来&am…...

Nginx server_name 配置说明

Nginx 是一个高性能的反向代理和负载均衡服务器&#xff0c;其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机&#xff08;Virtual Host&#xff09;。 1. 简介 Nginx 使用 server_name 指令来确定…...

EtherNet/IP转DeviceNet协议网关详解

一&#xff0c;设备主要功能 疆鸿智能JH-DVN-EIP本产品是自主研发的一款EtherNet/IP从站功能的通讯网关。该产品主要功能是连接DeviceNet总线和EtherNet/IP网络&#xff0c;本网关连接到EtherNet/IP总线中做为从站使用&#xff0c;连接到DeviceNet总线中做为从站使用。 在自动…...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词

Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵&#xff0c;其中每行&#xff0c;每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid&#xff0c;其中有多少个 3 3 的 “幻方” 子矩阵&am…...

Java + Spring Boot + Mybatis 实现批量插入

在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法&#xff1a;使用 MyBatis 的 <foreach> 标签和批处理模式&#xff08;ExecutorType.BATCH&#xff09;。 方法一&#xff1a;使用 XML 的 <foreach> 标签&#xff…...

音视频——I2S 协议详解

I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议&#xff0c;专门用于在数字音频设备之间传输数字音频数据。它由飞利浦&#xff08;Philips&#xff09;公司开发&#xff0c;以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...

C/C++ 中附加包含目录、附加库目录与附加依赖项详解

在 C/C 编程的编译和链接过程中&#xff0c;附加包含目录、附加库目录和附加依赖项是三个至关重要的设置&#xff0c;它们相互配合&#xff0c;确保程序能够正确引用外部资源并顺利构建。虽然在学习过程中&#xff0c;这些概念容易让人混淆&#xff0c;但深入理解它们的作用和联…...

【JavaSE】多线程基础学习笔记

多线程基础 -线程相关概念 程序&#xff08;Program&#xff09; 是为完成特定任务、用某种语言编写的一组指令的集合简单的说:就是我们写的代码 进程 进程是指运行中的程序&#xff0c;比如我们使用QQ&#xff0c;就启动了一个进程&#xff0c;操作系统就会为该进程分配内存…...

Razor编程中@Html的方法使用大全

文章目录 1. 基础HTML辅助方法1.1 Html.ActionLink()1.2 Html.RouteLink()1.3 Html.Display() / Html.DisplayFor()1.4 Html.Editor() / Html.EditorFor()1.5 Html.Label() / Html.LabelFor()1.6 Html.TextBox() / Html.TextBoxFor() 2. 表单相关辅助方法2.1 Html.BeginForm() …...