当前位置: 首页 > news >正文

【kafka系列】消费者

目录

获取消息

1. 消费者获取消息的流程逻辑分析

阶段一:消费者初始化

阶段二:分区分配与重平衡(Rebalance)

阶段三:消息拉取与处理

阶段四:偏移量提交

核心设计思想

2. 流程

关键点总结

常见参数

一、核心必填参数

二、消费者组与重平衡参数

三、消息拉取与处理参数

四、偏移量(Offset)提交参数

五、错误处理与容错参数

六、高级配置


获取消息

1. 消费者获取消息的流程逻辑分析

Kafka 消费者通过 消费者组(Consumer Group) 协作消费消息,核心流程分为 初始化、分区分配、消息拉取、偏移量提交 四个阶段:


阶段一:消费者初始化
  1. 订阅 Topic
    • 消费者通过 consumer.subscribe() 订阅一个或多个 Topic。
    • 若消费者属于同一消费者组,组内消费者会均分 Topic 的分区
  1. 加入消费者组
    • 消费者启动时向 Broker 发送 JoinGroup 请求,加入消费者组。
    • 若消费者是组内第一个成员,会被选举为 Leader 消费者,负责分区分配。

阶段二:分区分配与重平衡(Rebalance)
  1. 分区分配策略
    • Leader 消费者根据策略(如 RangeAssignorRoundRobinAssignor)分配分区。
    • 分配结果通过 SyncGroup 请求同步给所有消费者。
  1. 重平衡触发条件
    • 消费者加入或离开组。
    • Topic 的分区数量变化。
    • 消费者心跳超时(默认 session.timeout.ms=45s)。

阶段三:消息拉取与处理
  1. 拉取消息
    • 消费者向分区的 Leader Broker 发送 FetchRequest,从当前偏移量(Offset)拉取消息。
    • 关键配置:
      • max.poll.records:单次拉取最大消息数(默认 500)。
      • fetch.min.bytes:最小拉取数据量(默认 1B,优先吞吐量时可调大)。
  1. 处理消息
    • 用户通过 ConsumerRecords 处理消息,需在 max.poll.interval.ms(默认 5分钟)内完成,否则触发重平衡。

阶段四:偏移量提交
  1. 提交 Offset
    • 自动提交:由消费者线程周期性提交(enable.auto.commit=true,默认 5秒)。
    • 手动提交:用户调用 commitSync()commitAsync() 精确控制。
    • Offset 存储在 Kafka 内部 Topic __consumer_offsets 中。

核心设计思想
  • 负载均衡:通过消费者组实现分区并行消费。
  • 容错性:心跳机制检测消费者存活,重平衡保障分区重新分配。
  • 至少一次语义:Offset 提交后移,确保消息至少被消费一次。

2. 流程


关键点总结

  1. 重平衡机制:保障消费者组动态扩展和容错。
  2. Offset 管理:通过提交 Offset 实现消费进度持久化。
  3. 消息拉取优化:通过 fetch.min.bytesmax.poll.records 平衡吞吐与延迟。
  4. 超时控制session.timeout.msmax.poll.interval.ms 防止消费者僵死

常见参数

一、核心必填参数

参数名

默认值

说明

bootstrap.servers

Kafka 集群地址列表(逗号分隔,如 host1:9092,host2:9092

)。

group.id

消费者组 ID(同一组内的消费者共享分区负载)。

key.deserializer

Key 的反序列化类(如 org.apache.kafka.common.serialization.StringDeserializer

)。

value.deserializer

Value 的反序列化类(同上)。


二、消费者组与重平衡参数

参数名

默认值

说明

session.timeout.ms

45000

(45秒)

消费者与 Broker 的心跳超时时间,超时触发重平衡。

heartbeat.interval.ms

3000

(3秒)

消费者发送心跳的间隔时间(需小于 session.timeout.ms

的 1/3)。

max.poll.interval.ms

300000

(5分钟)

两次 poll()

调用的最大间隔时间,超时触发重平衡。

partition.assignment.strategy

RangeAssignor

分区分配策略(如 RoundRobinAssignor

CooperativeStickyAssignor

)。


三、消息拉取与处理参数

参数名

默认值

说明

fetch.min.bytes

1

(1字节)

单次拉取的最小数据量(Broker 等待足够数据后返回,提升吞吐量)。

fetch.max.bytes

52428800

(50MB)

单次拉取的最大数据量(需小于 Broker 的 message.max.bytes

)。

max.poll.records

500

单次 poll()

返回的最大消息数(避免内存溢出)。

max.partition.fetch.bytes

1048576

(1MB)

单分区单次拉取的最大数据量。


四、偏移量(Offset)提交参数

参数名

默认值

说明

enable.auto.commit

true

是否自动提交 Offset(建议设为 false

,手动提交确保精确控制)。

auto.commit.interval.ms

5000

(5秒)

自动提交 Offset 的时间间隔(enable.auto.commit=true

时生效)。

auto.offset.reset

latest

无初始 Offset 时的策略:<br>- earliest

:从最早消息开始。<br>- latest

:从最新消息开始。


五、错误处理与容错参数

参数名

默认值

说明

isolation.level

read_uncommitted

事务消息隔离级别:<br>- read_committed

:仅读取已提交的事务消息。


六、高级配置

参数名

默认值

说明

client.id

客户端标识(用于监控和日志)。

connections.max.idle.ms

540000

(9分钟)

空闲连接超时时间(Broker 主动关闭超时连接)。

request.timeout.ms

30000

(30秒)

消费者等待 Broker 响应的超时时间。

相关文章:

【kafka系列】消费者

目录 获取消息 1. 消费者获取消息的流程逻辑分析 阶段一&#xff1a;消费者初始化 阶段二&#xff1a;分区分配与重平衡&#xff08;Rebalance&#xff09; 阶段三&#xff1a;消息拉取与处理 阶段四&#xff1a;偏移量提交 核心设计思想 2. 流程 关键点总结 常见参数…...

HackerRank C++面试,中等难度题目 - Attribute Parser

去除字符串首尾的空白字符&#xff08;包括空格、制表符、换行符和回车符&#xff09; void trim(string &s) {size_t start s.find_first_not_of(" \t\n\r");size_t end s.find_last_not_of(" \t\n\r");if (start string::npos) {s ""…...

【ARM】解决ArmDS Fast Models 中部分内核无法上电的问题

1、 文档目标 解决ArmDS Fast Models 中部分内核无法上电的问题。 2、 问题场景 在调用ArmDS的Fast Models中的Cortex-A55的模型&#xff0c;只有Core 0是上电状态&#xff0c;而Core 1处于掉电状态&#xff0c;如图2-1所示&#xff1a; 图2-1 3、软硬件环境 1&#xff09;…...

节目选择器安卓软件编写(针对老年人)

文章目录 需求来源软件界面演示效果源码获取 对爬虫、逆向感兴趣的同学可以查看文章&#xff0c;一对一小班教学&#xff1a;https://blog.csdn.net/weixin_35770067/article/details/142514698 需求来源 由于现在的视频软件过于复杂&#xff0c;某客户想开发一个针对老年人、…...

蓝桥杯之图

图&#xff1a; 对于图来说&#xff0c;重点在于之后的最短路径算法&#xff0c;这边简单做一下了解即可 代码&#xff1a; #include<iostream> #include<string> #include<vector> #include<list> #include<queue> using namespace std; clas…...

中兴光猫修改SN,MAC,修改地区,异地注册,改桥接,路由拨号

前言 请先阅读上一篇博客获取到光猫超级密码电信光猫获取超级密码 电信光猫天翼网关4.0获取超级密码教程 四川电信光猫 中兴 F1855V2 ZXHN F1855V2 telent权限 实战 实测_天翼4.0光猫超级密码-CSDN博客 修改SN-修改地区&#xff0c;光猫异地注册&#xff0c;设置桥接模式&#…...

【kafka系列】Kafka如何保证消息不丢失?

目录 1. 生产者端&#xff1a;确保消息成功发送到Broker 核心机制&#xff1a; 关键步骤&#xff1a; 2. Broker端&#xff1a;持久化与副本同步 核心机制&#xff1a; 关键源码逻辑&#xff1a; 3. 消费者端&#xff1a;可靠消费与Offset提交 核心机制&#xff1a; 关…...

AtCoder Beginner Contest 393 —— E - GCD of Subset 补题 + 题解 python

AtCoder Beginner Contest 393 E - GCD of Subset Problem Statement You are given a sequence A ( A 1 , A 2 , … , A N ) A (A_1, A_2, \dots, A_N) A(A1​,A2​,…,AN​) of length N N N and a positive integer K K K (at most N N N). For each i 1 , 2 , … …...

vue3响应式丢失解决办法(三)

vue3的响应式的理解&#xff0c;与普通对象的区别&#xff08;一&#xff09; vue3 分析总结响应式丢失问题原因&#xff08;二&#xff09; 经过前面2篇文章&#xff0c;知道了响应式为什么丢失了&#xff0c;但是还是碰到了丢失情况&#xff0c;并且通过之前的内容还不能解…...

BY组态:构建灵活、可扩展的自动化系统

引言 在现代工业自动化领域&#xff0c;BY组态&#xff08;Build Your Own Configuration&#xff09;作为一种灵活、可扩展的解决方案&#xff0c;正逐渐成为工程师和系统集成商的首选。BY组态允许用户根据具体需求自定义系统配置&#xff0c;从而优化生产效率、降低成本并提…...

2025 (ISC)²CCSP 回忆录

2025.1.20 广州&#xff0c;周一&#xff0c;我一次性通过了CCSP的考试。 为什么要考证&#xff1f; 个人成长所需 职业热情&#xff1a;做一行爱一行&#xff0c;既然我投入了美好的青春年华到网络安全行业当中&#xff0c;那么对于这个行业最有权威的认证&#xff0c;是肯定…...

强化学习笔记7——DDPG到TD3

前提&#xff1a;基于TD 的方法多少都会有高估问题&#xff0c;即Q值偏大。原因两个&#xff1a;一、TD目标是真实动作的高估。 二&#xff1a;自举法高估。 DDPG 属于AC方法&#xff1a;异策略&#xff0c;适合连续动作空间&#xff0c;因为他的策略网络直接输出的动作&#…...

win10 系统 自定义Ollama安装路径 及模型下载位置

win10 系统 自定义Ollama安装路径 及模型下载位置 由于Ollama的exe安装软件双击安装的时候默认是在C盘&#xff0c;以及后续的模型数据下载也在C盘&#xff0c;导致会占用C盘空间&#xff0c;所以这里单独写了一个自定义安装Ollama安装目录的教程。 Ollama官网地址&#xff1…...

-bash:/usr/bin/rm: Argument list too long 解决办法

问题概述 小文件日志太多导致无法使用rm命令&#xff0c;因为命令行参数列表的长度超过了系统允许的最大值。 需要删除/tmp目录下的所有文件&#xff0c;文件数量比较多。 ls -lt /tmp | wc -l 5682452 解决方法如下&#xff1a; 使用find -exec 遍历&#xff0c;然后执行删…...

内容中台重构企业内容管理流程驱动智能协作升级

内容概要 内容中台作为企业数字化转型的核心基础设施&#xff0c;通过技术架构革新与功能模块整合&#xff0c;重构了传统内容管理流程的底层逻辑。其核心价值在于构建动态化、智能化的内容生产与流转体系&#xff0c;将分散的创作、存储、审核及分发环节纳入统一平台管理。基…...

python实现YouTube关键词爬虫(2025/02/11)

在当今数字化时代&#xff0c;YouTube作为全球最大的视频分享平台之一&#xff0c;拥有海量的视频资源。无论是进行市场调研、内容创作还是学术研究&#xff0c;能够高效地获取YouTube上的相关视频信息都显得尤为重要。今天&#xff0c;我将为大家介绍一个基于Python实现的YouT…...

【效率技巧】怎么做思维导图||数学思维||费曼学习法

目录标题 常见问题&#xff1a;认知误区和建议&#xff1a;思维导图按照功能分类思维导图好处步骤&#xff08;拆解的步骤&#xff09; 常见问题&#xff1a; 1、做好的思维导图浪费时间 2、做简单的思维导图没有效果 认知误区和建议&#xff1a; 1、做思维导图工具&#xf…...

LabVIEW与USB设备开发

开发一台USB设备并使用LabVIEW进行上位机开发&#xff0c;涉及底层驱动的编写、USB通信协议的实现以及LabVIEW与设备的接口设计。本文将详细介绍如何开发USB设备驱动、实现LabVIEW与USB设备的通信以及优化数据传输&#xff0c;帮助用户顺利完成项目开发。下面是一个详细的说明&…...

动态规划LeetCode-416.分割等和子集

给你一个 只包含正整数 的 非空 数组 nums 。请你判断是否可以将这个数组分割成两个子集&#xff0c;使得两个子集的元素和相等。 示例 1&#xff1a; 输入&#xff1a;nums [1,5,11,5] 输出&#xff1a;true 解释&#xff1a;数组可以分割成 [1, 5, 5] 和 [11] 。 示例 2&…...

云原生(五十五) | ECS中自建数据库迁移到RDS

文章目录 ECS中自建数据库迁移到RDS 一、场景说明 二、ECS中自建数据库迁移到RDS实现步骤 三、 创建wordpress数据库 四、登录ECS导出wordpress数据库 五、返回RDS数据库管理控制台 六、开启外网地址并设置白名单 七、获取RDS外网访问地址 八、重新设置wordpress的wp-…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…...

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …...

19c补丁后oracle属主变化,导致不能识别磁盘组

补丁后服务器重启&#xff0c;数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后&#xff0c;存在与用户组权限相关的问题。具体表现为&#xff0c;Oracle 实例的运行用户&#xff08;oracle&#xff09;和集…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…...

大型活动交通拥堵治理的视觉算法应用

大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动&#xff08;如演唱会、马拉松赛事、高考中考等&#xff09;期间&#xff0c;城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例&#xff0c;暖城商圈曾因观众集中离场导致周边…...

[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?

论文网址&#xff1a;pdf 英文是纯手打的&#xff01;论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误&#xff0c;若有发现欢迎评论指正&#xff01;文章偏向于笔记&#xff0c;谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...

镜像里切换为普通用户

如果你登录远程虚拟机默认就是 root 用户&#xff0c;但你不希望用 root 权限运行 ns-3&#xff08;这是对的&#xff0c;ns3 工具会拒绝 root&#xff09;&#xff0c;你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案&#xff1a;创建非 roo…...

蓝桥杯 冶炼金属

原题目链接 &#x1f527; 冶炼金属转换率推测题解 &#x1f4dc; 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V&#xff0c;是一个正整数&#xff0c;表示每 V V V 个普通金属 O O O 可以冶炼出 …...

yaml读取写入常见错误 (‘cannot represent an object‘, 117)

错误一&#xff1a;yaml.representer.RepresenterError: (‘cannot represent an object’, 117) 出现这个问题一直没找到原因&#xff0c;后面把yaml.safe_dump直接替换成yaml.dump&#xff0c;确实能保存&#xff0c;但出现乱码&#xff1a; 放弃yaml.dump&#xff0c;又切…...

leetcode_69.x的平方根

题目如下 &#xff1a; 看到题 &#xff0c;我们最原始的想法就是暴力解决: for(long long i 0;i<INT_MAX;i){if(i*ix){return i;}else if((i*i>x)&&((i-1)*(i-1)<x)){return i-1;}}我们直接开始遍历&#xff0c;我们是整数的平方根&#xff0c;所以我们分两…...