深入Kafka核心设计与实践原理读书笔记第三章消费者
消费者
消费者与消费组
消费者Consumer负责定于kafka中的主题Topic,并且从订阅的主题上拉取消息。与其他消息中间件不同的在于它有一个消费组。每个消费者对应一个消费组,当消息发布到主题后,只会被投递给订阅它的消费组的一个消费者。
- 如果有某个主题有4个分区,P0,P1,P2,P3.有两个消费组A和B订阅了这个主题,A消费组有4个消费者,B消费组有2个消费者,那么A消费组中的4个消费者每一个都只会分配到一个分区,而B消费组中的2个消费者会分配到两个分区。
- 如果所有消费者都属于一个消费者,那么所有的消息默认会均匀分配给每一个消费者。
- 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者。
PS:再均衡动作:解释一下名词,指的是当一个主题中有6个分区时,有一个消费组,这个消费组中只有一个消费者,那么主题中的6个分区的消息都会由同一个消费者来消费,当有一个新的消费者加入这个消费组之后,6个主题中会有3个分配个新的消费者,依次类推,这个动作被称为再均衡动作
必要参数说明
kafka消费者客户端有个4个必填参数
- bootstrapp.service:该参数的释义和生产者客户端的相同,用来指定链接kafka集群所需要的broker地址清单。
- group.id:消费者隶属的消费组名称,默认为""
- key.deserializer和value.deserializer与生产者相同。
其他重要参数 - fetch.min.bytes:配置消费者在一次的poll中拉取的最小数据量 默认 1b
- fetch.max.bytes:配置消费者在一次的poll中拉取的最大数据量默认50MB.
- fetch.max.wait.ms :参数用于指定 Kafka 的等待时间,默认值为 500 )
- exclude.internal.topics:Kafka 中有两个内部的主题:一consumer_offsets tr ansaction state o exclude.internal.topics用来指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true 。如果设置 true ,那么只能使用 subscribe( Collection)的方式而不能使用subscribe(Pattern)的方式来订阅内部主题,设置为false 则没有这个限制。
- receive.buffer.bytes:这个参数用来设置 Socket 接收消息缓冲区的大小,默认值为 65536 (B) 如果设置为 -1,则使用操作系统的默认值。
- send.buffer.bytes:,这个参数用来设置 Socket 发送消息缓冲区的大小默认值为 13 1072 (B) ’
- request.timeout.ms: 这个参数用来配置 Consumer 等待请求响应的最长时间,默认值为 30000 ms )。
- metadata.max.age.ms: 这个参数用来配置元数据的过期时间,默认值为 300000 ms ),即5分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的broker 加入。
- reconnect.backoff.ms 这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间〉,避免频繁地连接主机,默认值为 50 ms )。
订阅主题与分区
订阅主题通过subscribe()方法来订阅一个主题,可以是集合订阅多个主题,也可以是正则。
public void subscribe(Collection<String> topics,ConsumerRebalanceListenergy listener);
public void subscribe(Collection<String> topics);
public void subscribe(Pattern pattern ,ConsumerRebalanceListenergy listener);
public void subscribe(Pattern pattern);
-
如果前后调用两次 subscribe方法 那么以后一次的为准。
PS:ConsumerRebalanceListenergy listener 是用来设置相应的再均衡监听器 -
这里还可以通过assign()方法来指定主题中特定的分区来定义。
public void assign(Collection<TopicPartition> partition);
- 其中 partition是分区的集合。
- TopicPartition类有两种属性 topic和partition,分别代表分区所属的主题和自己的分区偏移量也就是编号。
- 通过partitionsFor(String topic)方法可以查询主题有多少个分区
取消订阅
- unsubscribe()方法取消订阅主题
- subscribe(new ArrayList<>());
- assign(new ArrayList<>());
以上都可
反序列化
对应生产者的序列化器相反,用来把序列化的内容反序列化,至于序列化与反序列化请自行百度,基础概念不与重复。
消息消费
Kafka中消费方式采取的拉去式消费:消息的消费一般分为两种:拉取式和推送式。
- kefka中的消息消费是一个不断轮询的过程。需要重复的效用poll方法。
public ConsumerRecords<K,V> poll(final Duration timeout);
- 其中timeOut 是用来限制poll方法的阻塞时间的
其中 Duration 也有Long的方法,Long的timeOut是毫秒值,Duration 可以通过ofMillis、ofSeconds、ofMinutes
、ofHours等方法来指定不同时间类型。
ConsumerRecords类中还会提供一个方便开发人员用来对消息进行处理的:count()等 如有兴趣自定查看。
位移提交
offset偏移量也叫位移,消费者可以通过offset来指定消费分区中的某个消息所在的位置。
- 每次调用poll方法返回的是未被消费的消息集,偏移量不仅要保存在内存中也要做持久化保存,否则消费者重启之后就无法知晓之前的消费位移,如果有新的消费者加入,那么必然会有再均衡动作,那么新加入的消费者也无法知晓之前的消费位移
- 在旧消费者客户端中消费者偏移量存储在zk中,新版本存放在kafka的主题_consumer_offsets中,这个把偏移量存储起来的动作就时提交。
控制或关闭消费
KafkaConsumer提供了对消费速度进行控制的方法。使用pause()方法resume()方法来分别实现暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区想客户端返回数据的操作。
指定位置消费
对应消费位移,主要用在消费者重启之后出发了再均衡动作之后指定偏移量消费分区内消息。
消费者拦截器
对应生产者消费器,主要在消费到消息或提交消费位移的时候进行一些定制化操作。
相关文章:

深入Kafka核心设计与实践原理读书笔记第三章消费者
消费者 消费者与消费组 消费者Consumer负责定于kafka中的主题Topic,并且从订阅的主题上拉取消息。与其他消息中间件不同的在于它有一个消费组。每个消费者对应一个消费组,当消息发布到主题后,只会被投递给订阅它的消费组的一个消费者。 如…...

IDEA 中使用 Git 图文教程详解
✅作者简介:2022年博客新星 第八。热爱国学的Java后端开发者,修心和技术同步精进。 🍎个人主页:Java Fans的博客 🍊个人信条:不迁怒,不贰过。小知识,大智慧。 💞当前专栏…...

【Linux系统】进程概念
目录 1 冯诺依曼体系结构 2 操作系统(Operator System) 概念 设计OS的目的 定位 总结 系统调用和库函数概念 3 进程 3.1 基本概念 3.2 描述进程-PCB 3.2 组织进程 3.3 查看进程 3.4 通过系统调用获取进程标示符 3.5 进程状态 在了解进程概念前我们还得了解下冯诺…...
上课睡觉(2023寒假每日一题 4)
有 NNN 堆石子,每堆的石子数量分别为 a1,a2,…,aNa_1,a_2,…,a_Na1,a2,…,aN。 你可以对石子堆进行合并操作,将两个相邻的石子堆合并为一个石子堆,例如,如果 a[1,2,3,4,5]a[1,2,3,4,5]a[1,2,3,4,5],合并第 2,32…...

【Selenium学习】Selenium 中常用的基本方法
1.send_keys 方法模拟键盘键入此方法类似于模拟键盘键入。以在百度首页搜索框输入“Selenium”为例,代码如下:# _*_ coding:utf-8 _*_ """ name:zhangxingzai date:2023/2/13 form:《Selenium 3Python 3自动化测试项目实战》 …...

python练习——简化路径
项目场景: 给你一个字符串 path ,表示指向某一文件或目录的 Unix 风格 绝对路径 (以 /开头),请你将其转化为更加简洁的规范路径。在 Unix 风格的文件系统中,一个点(.)表示当前目录本…...
2023新华为OD机试题 - 火星文计算2(JavaScript) | 刷完必过
火星文计算 2 题目 已知火星人使用的运算符号为#;$ 其与地球人的等价公式如下 x#y=4*x+3*y+2 x$y=2*x+y+3 x y是无符号整数 地球人公式按照 c 语言规则进行计算 火星人公式中#符优先级高于$ 相同的运算符按从左到右的顺序运算 输入 火星人字符串表达式结尾不带回车换行 输入…...

前端插件重磅来袭
“你值得拥有”专栏系列上新啦,今日推出“手写前端插件”项目,作为一个前端中高级工程师,手写前端树形菜单插件、弹出层插件、日历插件、分页插件、选项卡插件、进度条插件等是必备的技能,让你的前端技术百尺竿头更进一步…...

深入工厂|高精密多层板是如何被智造出来的?
或许有很多人从网络上见过各种教程,告诉你单层板是什么,多层板是什么,他们该如何做出来,但是在具体制造时却全凭想象,今天,就让我们来实地看看,精密的多层板是如何被制造出来的!今天…...
代理模式动态代理
什么是代理模式? 代理模式是开发中常见的一种设计模式,使用代理模式可以很好的对程序进行横向扩展。代理,顾名思义就是一个真实对象会存在一个代理对象,并且代理对象可以替真实对象完成相应操作,外部通过代理对象来访…...
Mysql之二进制日志
目录 二进制日志 12-37 二进制日志格式 基于行的二进制日志 基于语句的二进制日志 混合格式二进制日志 复制日志 12-42 故障安全 (Crash-Safe) 复制 多源复制 二进制日志 12-37 二进制日志: • 包含数据和模式更改及其时间戳 – 基于语句 或 基于行 的日志…...

kail工具的使用--- cewl
1.介绍 Cewl是一款采用Ruby开发的应用程序,可以给他的爬虫指定URL地址和爬取深度,还可以添加外部链接,接下来Cewl会给你返回一个字典文件,你可以把字典用到类似John the Ripper这样的密码破解工具中。 2.使用 输入以下命令之后…...
【蓝桥杯集训1】前缀和专题(2 / 5)
目录 前缀和模板 !3956. 截断数组 - 前缀和枚举 前缀和模板 活动 - AcWing import java.util.*;class Main {static int N100010;static int[] anew int[N],snew int[N];public static void main(String[] args){Scanner scnew Scanner(System.in);int nsc.nex…...

基于模块联邦的微前端实现方案
一、 微前端应用案例概述 当前案例中包含三个微应用,分别为 Marketing、Authentication 和 Dashboard Marketing:营销微应用,包含首页组件和价格组件 Authentication:身份验证微应用,包含登录组件 Dashboard&#x…...

【单目标优化算法】食肉植物优化算法(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...
ANTLR4入门学习(四)
ANTLR4入门学习(四)一、设计语法1.语法2.ANTLR核心标记3.常见计算机语言模式4.左右递归5.识别常见的语法结构5.1 匹配标识符5.2 匹配数字5.3 匹配字符串常量5.4 匹配注释和空白字符5.5 基础的语法规则5.6 划定词法分析器和语法分析器的界线一、设计语法 …...
Android okhttp3中发送websocket消息,并通过mockwebserver将一个安卓设备模拟成服务器接发消息
websocket 提供了客户端和服务端的长链接,允许客户端和服务端双向发送消息 okhttp 提供了使用websocket 相关接口议。同时为方便单元测试,又提供了mockwebserver可以把一个安卓客户端作为服务端接受消息。 websocket使用 权限 <uses-permission an…...

MySQL系统变量和自定义变量
1 系统变量1.1 查看系统变量可以使用以下命令查看 MySQL 中所有的全局变量信息。SHOW GLOBAL VARIABLES; MySQL 中的系统变量以两个“”开头。global 仅仅用于标记全局变量;session 仅仅用于标记会话变量;首先标记会话变量,如果会话变量不存在…...

基于Python来爬取某音动态壁纸,桌面更香了!
至于小伙伴们想要这个封图,我也没有。不过继续带来一波靓丽壁纸,而且是动态的,我的桌面壁纸又换了:每天换着花样欣赏一波波动态壁纸桌面立刻拥有了高颜值,简直跟刷美女短视频一样啊。对的,这些动态壁纸就是…...

[数据库]表的约束
●🧑个人主页:你帅你先说. ●📃欢迎点赞👍关注💡收藏💖 ●📖既选择了远方,便只顾风雨兼程。 ●🤟欢迎大家有问题随时私信我! ●🧐版权:本文由[你帅…...
Vim 调用外部命令学习笔记
Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...
连锁超市冷库节能解决方案:如何实现超市降本增效
在连锁超市冷库运营中,高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术,实现年省电费15%-60%,且不改动原有装备、安装快捷、…...
React Native在HarmonyOS 5.0阅读类应用开发中的实践
一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强,React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 (1)使用React Native…...

《用户共鸣指数(E)驱动品牌大模型种草:如何抢占大模型搜索结果情感高地》
在注意力分散、内容高度同质化的时代,情感连接已成为品牌破圈的关键通道。我们在服务大量品牌客户的过程中发现,消费者对内容的“有感”程度,正日益成为影响品牌传播效率与转化率的核心变量。在生成式AI驱动的内容生成与推荐环境中࿰…...

Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)
引言:为什么 Eureka 依然是存量系统的核心? 尽管 Nacos 等新注册中心崛起,但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制,是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...
AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)
UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中,UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化…...

【JavaWeb】Docker项目部署
引言 之前学习了Linux操作系统的常见命令,在Linux上安装软件,以及如何在Linux上部署一个单体项目,大多数同学都会有相同的感受,那就是麻烦。 核心体现在三点: 命令太多了,记不住 软件安装包名字复杂&…...

ArcGIS Pro制作水平横向图例+多级标注
今天介绍下载ArcGIS Pro中如何设置水平横向图例。 之前我们介绍了ArcGIS的横向图例制作:ArcGIS横向、多列图例、顺序重排、符号居中、批量更改图例符号等等(ArcGIS出图图例8大技巧),那这次我们看看ArcGIS Pro如何更加快捷的操作。…...
Java 二维码
Java 二维码 **技术:**谷歌 ZXing 实现 首先添加依赖 <!-- 二维码依赖 --><dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.5.1</version></dependency><de…...