消息消费过程
前言
本文介绍下Kafka消费过程, 内容涉及消费与消费组, 主题与分区, 位移提交,分区再平衡和消费者拦截器等内容。
消费者与消费组
Kafka将消费者组织为消费组, 消息只会被投递给消费组中的1个消费者。因此, 从不同消费组中的消费者来看, Kafka是多播(Pub/Sub)模式。从同一个消费组中的消费者来看, Kafka是单播(P2P)模式。
开发流程
- 配置consumer参数并创建consumer实例;
- 订阅主题;
- 拉取消息并消费;
- 提交消费偏移量;
- 关闭consumer;
class ConsumerTest {public static void main(String[] args) {Properties props = new Properties();// bootstrap serverprops.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");// group.id, 如果当前consumer需要加入到某个group中, 否则自成一个groupprops.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");// 自动创建topic, 开发中可能consumer端的小伙伴先开始, 等不及生产端。props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "");// 自动提交offset设置, 样例中为手动提交// props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "");// 自动提交offset的时间间隔// props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "");// offset reset配置props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "");// key和value的deserializer配置props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "");props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);boolean running = true;while(running) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records) {// 消费消息}// 消费成功提交offsetconsumer.commitSync();}consumer.close();}
}
主题与分区
每个Topic中的消息由若干个分区存储, 每个分区存储了整个Topic下消息的一部分。在消息消费阶段, 同一个partition会被分配给消费组中的某一个consumer。因此partion的数量决定了一个consumer group中consumer的上限。
例如, Topic test 有 3 个partition, 对应的consumer group test-group中有4个consumer(consumer-1, consumer-2, consumer-3, consumer-4), 那么其中的某个consumer会处于空闲状态, 因为没有partition可以被分配, 进而也就无消息可消费。
反序列化
consumer作为消息的消费方, 必须使用与producer serializer相兼容的deserializer, 这样才能正确解析出对应的消息, 进而做消息消费。可以配置消息的key和message的deserialzer。Kafka内置了基本数据类型的Deserializer, IntegerDeserializer。
interface Deserializer {T deserialize(String topic, byte[] data);void close();
}
主题订阅
- 订阅通过subscribe方法完成。如果订阅方法反复调用, 仅最后一次的调用生效。
- 订阅多个特定主题, subscribe(collection);
- 订阅某种模式的主题, subscribe(pattern);
- 订阅某个主题的特定partition, assign(partition);
- 无论是哪种订阅方式, 一个consumer只能使用其中的一种, 都可以通过unsubscribe来取消订阅;
消息获取
- 消息消费的前提是topic中的消息投递给consumer。总体来说消息投递有2种模式, 推模式和拉模式。
- 推模式: client建立到server的长链接, 当server中有消息产生时, 第一时间通过该长链接推送到client;
- 拉模式: client主动发起消息请求, 从server端拉取消息;
- 从代码来看, Kafka是拉模式。由于consumer无法预知, topic中是否有新消息, 因此无效请求是存在的。Kafka设计者也注意到了这点, 提供了如下方法, 加入了一个等待窗口。如果窗口内有新消息到达, 则立刻返回; 如果始终无消息到达, 则超时后返回。平衡消息消费的及时性, 无效请求数量, 和server端实现复杂性。
kafkaConsumer.poll(timeout, timeunit)
内部涉及消费者位移, 消费者协调器, 组协调器, 消费者选择具, 分区分配的分发和再分配, 消费者心跳等内容。
位移提交
位移是消息在存储中的位置说明。通常来说, 消费者继续消费尚未消费的消息。消息存储和消费的逻辑模型如下:
- 消息是按照partition存储的;
- 消息写入partition时, offset单调递增;
- 从partition消费时, 每个消费者维护自己的offset; 消费中断后恢复时, 从上次保存的offset位置开始继续消费;
因此消息是否已经被消费由offset决定, offset及其之前的消息是已消费的消息, offset之后是待消费消息。因此, 消费者完成某个分区的消费之后, 需要提交该offset给Kafka Server。提交方式有两种自动提交和手动提交;
提交方式 | 说明 | 优缺点 |
---|---|---|
自动提交 | (默认方式) Kafka Client周期性地提交偏移量 | 优点是简单, 确定是重复消费和丢失风险 |
手动提交 | 由用户主动提交偏移量 | 优点是可细粒度管理, 缺点是相对复杂 |
自动提交
自动提交是按照时间间隔提交, 如果在消息拉取和位移提交之间client崩溃, 对下一次消费的影响分三种场景讨论(如下图所示)。
- consumer thread中poll和消费是串行的, 但consumer thread和commit thread是并行的;
- 在poll和crash之间发生commit, 那么当client恢复后从x+7开始拉取消息, [x+3, x+6] 的消息丢失;
- 在crash之后发生commit, 那么当client恢复后从x+7开始拉取消息, [x+3, x+6] 的消息丢失;
- 全程没有发生commit, 那么当client恢复后从x+1开始拉取消息, [x+1, x+3]的消息重复消费;
手动提交
也有两种模式, 同步提交和异步提交。前者在得到server确认之前所在线程会阻塞, 后者线程继续运行。是需要结合场景来选择。
| 提交方式 | 说明 | 优缺点 |
| 同步提交 | 针对当前拉取的一批消息, 统一提交 | 简单, 无法做细粒度控制 |
| 异步提交 | 基于回调通知结果 | 可以按分区提交, 指定offset参数提交 |
指定offset
消费消息需要从某个offset开始, 如果是首次消费又该从哪个位置开始呢?
-
由参数auto.offset.reset设定默认行为
| 参数值 | 行为 |
|----|----|
| earliest | 从分区第一条消息的offset开始 |
| latest | 从上次保存的offset开始, 首次消费时和earliest行为一致 |
| none | 程序逻辑自定义, 如果未设置则抛出异常 | -
程序通过seek方法指定offset位置, 如果指定offset越界也会触发auto.offset.reset行为; 由于offset是partition级别的概念, 因此seek的使用是面向partition, 这就意味着对同一个topic的多个partition来说, 可以seek不同的offset。此外seek方法也支持基于timestamp定位消息。站在更高的视角来看, seek提供了parttion级别的消息搜索能力。
-
由于seek的存在, 我们可以把offset存储在DB或者其他Kafka之外的地方, 并基于seek进行恢复。
再平衡
再平衡是把分区所有权从1个消费者转移到另一个消费者的行为, 它保障消费组的可用性和伸缩性。从可用性而言, 消费故障可以恢复。就伸缩性而言, 消费组内的消费者可以扩缩容。再平衡期间, 所有的消费者暂停消费, 直到再平衡结束。由于再平衡期间, 消费者的消费状态会丢失, 再平衡之后每个partition的offset以Kafka已持久保存的offset为准, 因此可能存在重复消费情况。
Kafka提供ConsumerRebalanceListener接口, 使得该过程可以被Consumer感知, 至于怎么处理则是应用需要解决的问题, Kafka也只能帮我们到这里。
消费者拦截器
Kafka提供了ConsumerInterceptor接口, 允许我们在poll方法返回前和commit方法调用后触发, 允许我们做一些定制化的工作, 比如消息过滤, 日志输出, 消息追踪等操作。从网络应用开发的角度来说, 这种是一种常见的实现方式, 比如Tomcat中的Filter。
拦截器通过interceptor.classes配置生效, 多个拦截器可以组合成为"拦截器Pipeline"。如果其中一个拦截器异常, 后续的拦截器从最近一次成功的拦截器继续执行, 因此需要提防副作用。
相关文章:

消息消费过程
前言 本文介绍下Kafka消费过程, 内容涉及消费与消费组, 主题与分区, 位移提交,分区再平衡和消费者拦截器等内容。 消费者与消费组 Kafka将消费者组织为消费组, 消息只会被投递给消费组中的1个消费者。因此, 从不同消费组中的消费者来看, Kafka是多播(Pub/Sub)模式…...

使用Lychee搭建个人图片存储系统并进行远程访问设置实现公网访问本地私人图床
文章目录 1.前言2. Lychee网站搭建2.1. Lychee下载和安装2.2 Lychee网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4.公网访问测试5.结语 1.前言 图床作为图片集中存放的服务网站,可以看做是云存储的一部分,既可…...

12-2- DCGAN -简单网络-卷积网络
功能 随机噪声→生成器→MINIST图像。 训练方法 0 损失函数:gan的优化目标是一个对抗损失,是二分类问题,用BCELoss 1 判别器的训练,首先固定生成器参数不变,其次判别器应当将真实图像判别为1,生成图像判别为0 loss=loss(real_out, 1)+loss(fake_out, 0) 2 生成器的…...

Redis持久化策略之RDB与AOF
文章目录 1.RDB1)基本介绍2)自动触发3)手动触发4)RDB文件5)优点缺点 2.AOF1)基本介绍2)使用方式3)工作流程4)重写机制5)AOF文件6)优点缺点 3.RDB AOF 我们都知道,redis 是一个基于内存的数据库。基于内存的好处是访问速度快,缺点是“不持久”——当数据…...

Python学习笔记--初识 Python 正则表达式
初识 Python 正则表达式 正则表达式是一个特殊的字符序列,用于判断一个字符串是否与我们所设定的字符序列是否匹配,也就是说检查一个字符串是否与某种模式匹配。 Python 自 1.5 版本起增加了re 模块,它提供 Perl 风格的正则表达式模式。re 模块使 Python 语言拥有全部的正…...

webAPP基础学习
###视觉基础 part-I ####1.面试中常见的像素问题 >什么是像素? *1.什么是px? px-虚拟像素,css像素的单位 px是一个相对单位,相对于设备像素而言 >相对性 a.相对于同一个设备,css像素的可变的 css像素物理像素>会受到缩放的影响 css像素缩放倍数*单个物理像…...

RIP路由信息协议
RIP路由信息协议(Routing Information Protocol) 最先得到广泛应用的协议,最大优点是简单要求网络中的每个路由器都要维护一张表,表中记录了从它自己到其他每一个目的网络的距离RIP是应用层协议,它在传输层使用UDP,RIP报文作为UD…...

kubernetes 高可用集群
目录 一、haproxy负载均衡 二、pacemaker高可用 三、部署control-plane 四、部署worker node 实验环境 主机名 IP 角色 docker 192.168.67.10 harbor k8s1 192.168.67.11 control-plane k8s2 192.168.67.12 control-plane k8s3 192.168.67.13 control-plane k8s…...

java实现插入排序
图解 以下是Java实现插入排序的代码: public class InsertionSort {public static void main(String[] args) {int[] arr {5, 2, 4, 6, 1, 3};insertionSort(arr);System.out.println(Arrays.toString(arr)); // output: [1, 2, 3, 4, 5, 6]}public static void i…...

深度学习之基于YoloV5血红细胞检测识别系统
欢迎大家点赞、收藏、关注、评论啦 ,由于篇幅有限,只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 深度学习已经在许多领域中得到了广泛的应用,包括医疗健康领域。其中,YOLO(You O…...

8、可视化高斯滤波器并完成高斯滤波
本节一起绘制一个可视化的高斯滤波器,同时对一个彩色图像增加高斯噪声,最后通过一个高斯滤波器对图像进行降噪处理。 就像上节说的那样,滤波不是学习重点,下面通过实操了解下原理即可。 可视化高斯滤波器 高斯滤波器符合高斯分布,并且是二维高斯分布,这一点在上一节高斯…...

Linux MMC子系统 - 5.eMMC 5.1工作模式-引导模式
By: Ailson Jack Date: 2023.11.19 个人博客:http://www.only2fire.com/ 本文在我博客的地址是:http://www.only2fire.com/archives/164.html,排版更好,便于学习,也可以去我博客逛逛,兴许有你想要的内容呢。…...

2342. 数位和相等数对的最大和 --力扣 --JAVA
题目 给你一个下标从 0 开始的数组 nums ,数组中的元素都是 正 整数。请你选出两个下标 i 和 j(i ! j),且 nums[i] 的数位和 与 nums[j] 的数位和相等。 请你找出所有满足条件的下标 i 和 j ,找出并返回 nums[i] num…...

linux如何重置root密码
目录 当我们想要重置root管理员密码时,我们可以有两种方法进行: 方法一、init方法 1、重启系统,在下图所示界面按e键 2、随后进入以下界面,、将ro修改为rw,在行末尾添加init/bin/sh。编辑 3、随后按Ctrlx启动到s…...

Java 类之 java.util.Properties
Java 类之 java.util.Properties 文章目录 Java 类之 java.util.Properties一、简介二、主要功能1、存储键值对2、读取文件与属性代码示例运行结果截图 3、设置属性并保存文件代码示例结果截图 4、遍历属性代码示例运行结果 关联博客:《基于 Java 列举和说明常用的外…...

我遇到的bug(活动)
目录 方向一:身为程序员遇到过的奔溃瞬间 方向二:如何解决遇到的奔溃瞬间 方向三:在解决完后获得的收获和体会 方向一:身为程序员遇到过的奔溃瞬间 在一个项目中,我负责实现一个复杂的图像处理算法。经过几天的努力…...

MIB 6.1810实验Xv6 and Unix utilities(3)pingpong
Mit6.S081-实验1-Xv6 and Unix utilities-pingpong问题_Isana_Yashiro的博客-CSDN博客 Write a user-level program that uses xv6 system calls to ping-pong a byte between two processes over a pair of pipes, one for each direction. The parent should send a byte to…...

压力测试总共需要几个步骤?思路总结篇
在运维工作中,压力测试是一项很重要的工作。比如在一个网站上线之前,能承受多大访问量、在大访问量情况下性能怎样,这些数据指标好坏将会直接影响用户体验。今天我们就来深入了解下压力测试! 1、首先,什么是压力测试&…...

03_面向对象高级_多态
多态 1. 什么是多态? “多态” 是在 “继承” 的基础上实现的一种现象,具体表现为:对象多态、行为多态。 public class HelloWorld {public static void main(String[] args) {// 1. 对象多态Human h1 new Student();Human h2 new Teach…...

【Kingbase FlySync】界面化管控平台:2.配置数据库同步之KES>KES
【Kingbase FlySync】界面化管控平台:3.配置数据库同步之KES->KES 部署KES数据库到KES数据库同步服务1.登录KFS管理平台2.开始配置数据节点信息(1)配置node1数据节点(2)配置node2数据节点 3.KFS拓扑图配置4.开始部署5.启动同步程序并查验是否运行正常 测试同步1.从node1数据…...

企业计算机服务器中了mallox勒索病毒怎么解决,勒索病毒解密文件恢复
随着科技技术的不断发展,网络技术得到了快速提升,但网络安全威胁也不断增加,近期,云天数据恢复中心陆续接到很多企业的求助信息,企业的计算机服务器遭到了mallox勒索病毒攻击,导致企业的所有业务中断&#…...

Sonar生成PDF错误Can‘t get Compute Engine task status.Retry..... HTTP error: 401
报错及修改: 报错:INFO: Can’t get Compute Engine task status.Retry… org.sonarqube.ws.connectors.ConnectionException: HTTP error: 401, msg: , query: org.apache.commons.httpclient.methods.GetMethod7a021f49 ERROR: Problem generating PD…...

storage和正则表达式
一、Storage 1.认识Storage WebStorage主要提供了一种机制,可以让浏览器提供一种比cookie更直观的key、value存储方式: localStorage:本地存储,提供的是一种永久性的存储方法,在关闭掉网页重新打开时,存…...

【数据结构】图的广度优先遍历
一.广度优先遍历的基本思想 (1)访问顶点v; (2)依次访问v的各个未被访问的邻接点v1,v2,v3……,vk; (3)分别从v1,v2,v3……...

AM@函数展开成幂级数@间接法@常用麦克劳林幂级数展开公式
文章目录 间接法推导幂级数展开常用麦克劳林幂级数展开公式应用例例例 间接法推导幂级数展开 已知函数的幂级数展开公式间接推导其他函数幂级数 使用原始的推导公式推导函数的幂级数展开是繁琐不便的,需要分别计算各项系数 a n f ( n ) ( 0 ) n ! a_{n}\frac{f^{(n)}(0)}{n!}…...

LeetCode994.腐烂的橘子
看完题我觉得这不是和上一道岛屿的题一样简单嘛,然后写了将近2个小时才写出来,我的思路就是,用check()先对grid检查一下,是否有以下情况: (如果有1的周围都是空,则这个位置用不腐烂,…...

【开源】基于Vue和SpringBoot的康复中心管理系统
项目编号: S 056 ,文末获取源码。 \color{red}{项目编号:S056,文末获取源码。} 项目编号:S056,文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 普通用户模块2.2 护工模块2.3 管理员…...

【音视频基础】AVI文件格式
AVI文件采用的是RIFF文件结构方式。波形音频wave,MIDI和数字视频AVI都采用这种格式存储。 AVI文件的整体结构如下图所示 构造RIFF文件的基本单元叫做数据块(Chunk),每个数据块包含3个部分 4字节的数据块标记(或者叫…...

图书馆整理I(从尾到头打印列表),剑指offer,力扣
目录 题目地址: 我们直接看题解吧: 解题方法: 难度分析: 审题目事例提示: 解题思路(辅助栈): 代码(递归): 代码(列表插入): 相似题目对…...

C++编写的多线程自动爬虫程序
目录 引言 一、程序的设计 二、程序的实现 三、程序的测试 四、优化与改进 五、代码示例 总结 引言 随着互联网的快速发展,网络爬虫程序已经成为数据采集、信息处理的重要工具。C作为一种高效的编程语言,具有高效的并发处理能力和丰富的网络编程…...